diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 103203b0dba..e4e4ec3e957 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -874,9 +874,8 @@ impl Builder { } fn build_current_thread_runtime(&mut self) -> io::Result { - use crate::runtime::scheduler::{self, current_thread, CurrentThread}; + use crate::runtime::scheduler::{self, CurrentThread}; use crate::runtime::{Config, Scheduler}; - use std::sync::Arc; let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?; @@ -884,12 +883,19 @@ impl Builder { let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads); let blocking_spawner = blocking_pool.spawner().clone(); + // Generate a rng seed for this runtime. + let seed_generator_1 = self.seed_generator.next_generator(); + let seed_generator_2 = self.seed_generator.next_generator(); + // And now put a single-threaded scheduler on top of the timer. When // there are no futures ready to do something, it'll let the timer or // the reactor to generate some new stimuli for the futures to continue // in their life. let scheduler = CurrentThread::new( driver, + driver_handle, + blocking_spawner, + seed_generator_2, Config { before_park: self.before_park.clone(), after_unpark: self.after_unpark.clone(), @@ -898,20 +904,15 @@ impl Builder { #[cfg(tokio_unstable)] unhandled_panic: self.unhandled_panic.clone(), disable_lifo_slot: self.disable_lifo_slot, - seed_generator: self.seed_generator.next_generator(), + seed_generator: seed_generator_1, }, ); - let inner = Arc::new(current_thread::Handle { - spawner: scheduler.spawner().clone(), - driver: driver_handle, - blocking_spawner, - seed_generator: self.seed_generator.next_generator(), - }); - let inner = scheduler::Handle::CurrentThread(inner); + + let handle = scheduler::Handle::CurrentThread(scheduler.handle().clone()); Ok(Runtime { scheduler: Scheduler::CurrentThread(scheduler), - handle: Handle { inner }, + handle: Handle { inner: handle }, blocking_pool, }) } @@ -992,10 +993,10 @@ cfg_test_util! { cfg_rt_multi_thread! { impl Builder { fn build_threaded_runtime(&mut self) -> io::Result { + use crate::loom::sync::Arc; use crate::loom::sys::num_cpus; use crate::runtime::{Config, Scheduler}; use crate::runtime::scheduler::{self, multi_thread, MultiThread}; - use std::sync::Arc; let core_threads = self.worker_threads.unwrap_or_else(num_cpus); diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index 431e2f1f083..320fcb54ad5 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -27,8 +27,8 @@ pub(crate) struct CurrentThread { /// driver. notify: Notify, - /// Sendable task spawner - spawner: Spawner, + /// Shared handle to the scheduler + handle: Arc, /// This is usually None, but right before dropping the CurrentThread /// scheduler, it is changed to `Some` with the context being the runtime's @@ -38,10 +38,9 @@ pub(crate) struct CurrentThread { } /// Handle to the current thread scheduler -#[derive(Debug)] pub(crate) struct Handle { - /// Task spawner - pub(crate) spawner: Spawner, + /// Scheduler state shared across threads + shared: Shared, /// Resource driver handles pub(crate) driver: driver::Handle, @@ -57,10 +56,7 @@ pub(crate) struct Handle { /// a function that will perform the scheduling work and acts as a capability token. struct Core { /// Scheduler run queue - tasks: VecDeque>>, - - /// Sendable task spawner - spawner: Spawner, + tasks: VecDeque>>, /// Current tick tick: u32, @@ -78,18 +74,13 @@ struct Core { unhandled_panic: bool, } -#[derive(Clone)] -pub(crate) struct Spawner { - shared: Arc, -} - /// Scheduler state shared between threads. struct Shared { /// Remote run queue. None if the `Runtime` has been dropped. - queue: Mutex>>>>, + queue: Mutex>>>>, /// Collection of all active tasks spawned onto this executor. - owned: OwnedTasks>, + owned: OwnedTasks>, /// Unpark the blocked thread. unpark: Unpark, @@ -109,8 +100,8 @@ struct Shared { /// Thread-local context. struct Context { - /// Handle to the spawner - spawner: Spawner, + /// Scheduler handle + handle: Arc, /// Scheduler core, enabling the holder of `Context` to execute the /// scheduler. @@ -124,11 +115,17 @@ const INITIAL_CAPACITY: usize = 64; scoped_thread_local!(static CURRENT: Context); impl CurrentThread { - pub(crate) fn new(driver: Driver, config: Config) -> CurrentThread { + pub(crate) fn new( + driver: Driver, + driver_handle: driver::Handle, + blocking_spawner: blocking::Spawner, + seed_generator: RngSeedGenerator, + config: Config, + ) -> CurrentThread { let unpark = driver.unpark(); - let spawner = Spawner { - shared: Arc::new(Shared { + let handle = Arc::new(Handle { + shared: Shared { queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))), owned: OwnedTasks::new(), unpark, @@ -136,12 +133,14 @@ impl CurrentThread { config, scheduler_metrics: SchedulerMetrics::new(), worker_metrics: WorkerMetrics::new(), - }), - }; + }, + driver: driver_handle, + blocking_spawner, + seed_generator, + }); let core = AtomicCell::new(Some(Box::new(Core { tasks: VecDeque::with_capacity(INITIAL_CAPACITY), - spawner: spawner.clone(), tick: 0, driver: Some(driver), metrics: MetricsBatch::new(), @@ -151,13 +150,13 @@ impl CurrentThread { CurrentThread { core, notify: Notify::new(), - spawner, + handle, context_guard: None, } } - pub(crate) fn spawner(&self) -> &Spawner { - &self.spawner + pub(crate) fn handle(&self) -> &Arc { + &self.handle } #[track_caller] @@ -201,7 +200,7 @@ impl CurrentThread { Some(CoreGuard { context: Context { - spawner: self.spawner.clone(), + handle: self.handle.clone(), core: RefCell::new(Some(core)), }, scheduler: self, @@ -228,16 +227,16 @@ impl Drop for CurrentThread { // Drain the OwnedTasks collection. This call also closes the // collection, ensuring that no tasks are ever pushed after this // call returns. - context.spawner.shared.owned.close_and_shutdown_all(); + context.handle.shared.owned.close_and_shutdown_all(); // Drain local queue // We already shut down every task, so we just need to drop the task. - while let Some(task) = core.pop_task() { + while let Some(task) = core.pop_task(&self.handle) { drop(task); } // Drain remote queue and set it to None - let remote_queue = core.spawner.shared.queue.lock().take(); + let remote_queue = self.handle.shared.queue.lock().take(); // Using `Option::take` to replace the shared queue with `None`. // We already shut down every task, so we just need to drop the task. @@ -247,10 +246,10 @@ impl Drop for CurrentThread { } } - assert!(context.spawner.shared.owned.is_empty()); + assert!(context.handle.shared.owned.is_empty()); // Submit metrics - core.metrics.submit(&core.spawner.shared.worker_metrics); + core.metrics.submit(&self.handle.shared.worker_metrics); // Shutdown the resource drivers if let Some(driver) = core.driver.as_mut() { @@ -271,19 +270,19 @@ impl fmt::Debug for CurrentThread { // ===== impl Core ===== impl Core { - fn pop_task(&mut self) -> Option>> { + fn pop_task(&mut self, handle: &Handle) -> Option>> { let ret = self.tasks.pop_front(); - self.spawner + handle .shared .worker_metrics .set_queue_depth(self.tasks.len()); ret } - fn push_task(&mut self, task: task::Notified>) { + fn push_task(&mut self, handle: &Handle, task: task::Notified>) { self.tasks.push_back(task); self.metrics.inc_local_schedule_count(); - self.spawner + handle .shared .worker_metrics .set_queue_depth(self.tasks.len()); @@ -305,7 +304,7 @@ impl Context { fn park(&self, mut core: Box) -> Box { let mut driver = core.driver.take().expect("driver missing"); - if let Some(f) = &self.spawner.shared.config.before_park { + if let Some(f) = &self.handle.shared.config.before_park { // Incorrect lint, the closures are actually different types so `f` // cannot be passed as an argument to `enter`. #[allow(clippy::redundant_closure)] @@ -318,7 +317,7 @@ impl Context { if core.tasks.is_empty() { // Park until the thread is signaled core.metrics.about_to_park(); - core.metrics.submit(&core.spawner.shared.worker_metrics); + core.metrics.submit(&self.handle.shared.worker_metrics); let (c, _) = self.enter(core, || { driver.park(); @@ -328,7 +327,7 @@ impl Context { core.metrics.returned_from_park(); } - if let Some(f) = &self.spawner.shared.config.after_unpark { + if let Some(f) = &self.handle.shared.config.after_unpark { // Incorrect lint, the closures are actually different types so `f` // cannot be passed as an argument to `enter`. #[allow(clippy::redundant_closure)] @@ -344,7 +343,7 @@ impl Context { fn park_yield(&self, mut core: Box) -> Box { let mut driver = core.driver.take().expect("driver missing"); - core.metrics.submit(&core.spawner.shared.worker_metrics); + core.metrics.submit(&self.handle.shared.worker_metrics); let (mut core, _) = self.enter(core, || { driver.park_timeout(Duration::from_millis(0)); }); @@ -368,36 +367,40 @@ impl Context { } } -// ===== impl Spawner ===== +// ===== impl Handle ===== -impl Spawner { +impl Handle { /// Spawns a future onto the `CurrentThread` scheduler - pub(crate) fn spawn(&self, future: F, id: crate::runtime::task::Id) -> JoinHandle + pub(crate) fn spawn( + me: &Arc, + future: F, + id: crate::runtime::task::Id, + ) -> JoinHandle where F: crate::future::Future + Send + 'static, F::Output: Send + 'static, { - let (handle, notified) = self.shared.owned.bind(future, self.shared.clone(), id); + let (handle, notified) = me.shared.owned.bind(future, me.clone(), id); if let Some(notified) = notified { - self.shared.schedule(notified); + me.schedule(notified); } handle } - fn pop(&self) -> Option>> { + fn pop(&self) -> Option>> { match self.shared.queue.lock().as_mut() { Some(queue) => queue.pop_front(), None => None, } } - fn waker_ref(&self) -> WakerRef<'_> { + fn waker_ref(me: &Arc) -> WakerRef<'_> { // Set woken to true when enter block_on, ensure outer future // be polled for the first time when enter loop - self.shared.woken.store(true, Release); - waker_ref(&self.shared) + me.shared.woken.store(true, Release); + waker_ref(me) } // reset woken to false and return original value @@ -409,14 +412,13 @@ impl Spawner { cfg_metrics! { impl Handle { pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { - &self.spawner.shared.scheduler_metrics + &self.shared.scheduler_metrics } pub(crate) fn injection_queue_depth(&self) -> usize { // TODO: avoid having to lock. The multi-threaded injection queue // could probably be used here. - self.spawner - .shared + self.shared .queue .lock() .as_ref() @@ -426,46 +428,46 @@ cfg_metrics! { pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { assert_eq!(0, worker); - &self.spawner.shared.worker_metrics + &self.shared.worker_metrics } } } -impl fmt::Debug for Spawner { +impl fmt::Debug for Handle { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Spawner").finish() + fmt.debug_struct("current_thread::Handle { ... }").finish() } } // ===== impl Shared ===== -impl Schedule for Arc { +impl Schedule for Arc { fn release(&self, task: &Task) -> Option> { - self.owned.remove(task) + self.shared.owned.remove(task) } fn schedule(&self, task: task::Notified) { CURRENT.with(|maybe_cx| match maybe_cx { - Some(cx) if Arc::ptr_eq(self, &cx.spawner.shared) => { + Some(cx) if Arc::ptr_eq(self, &cx.handle) => { let mut core = cx.core.borrow_mut(); // If `None`, the runtime is shutting down, so there is no need // to schedule the task. if let Some(core) = core.as_mut() { - core.push_task(task); + core.push_task(self, task); } } _ => { // Track that a task was scheduled from **outside** of the runtime. - self.scheduler_metrics.inc_remote_schedule_count(); + self.shared.scheduler_metrics.inc_remote_schedule_count(); // If the queue is None, then the runtime has shut down. We // don't need to do anything with the notification in that case. - let mut guard = self.queue.lock(); + let mut guard = self.shared.queue.lock(); if let Some(queue) = guard.as_mut() { queue.push_back(task); drop(guard); - self.unpark.unpark(); + self.shared.unpark.unpark(); } } }); @@ -475,7 +477,7 @@ impl Schedule for Arc { fn unhandled_panic(&self) { use crate::runtime::UnhandledPanic; - match self.config.unhandled_panic { + match self.shared.config.unhandled_panic { UnhandledPanic::Ignore => { // Do nothing } @@ -484,13 +486,13 @@ impl Schedule for Arc { // `CURRENT` should match with `&self`, i.e. there is no // opportunity for a nested scheduler to be called. CURRENT.with(|maybe_cx| match maybe_cx { - Some(cx) if Arc::ptr_eq(self, &cx.spawner.shared) => { + Some(cx) if Arc::ptr_eq(self, &cx.handle) => { let mut core = cx.core.borrow_mut(); // If `None`, the runtime is shutting down, so there is no need to signal shutdown if let Some(core) = core.as_mut() { core.unhandled_panic = true; - self.owned.close_and_shutdown_all(); + self.shared.owned.close_and_shutdown_all(); } } _ => unreachable!("runtime core not set in CURRENT thread-local"), @@ -501,15 +503,15 @@ impl Schedule for Arc { } } -impl Wake for Shared { +impl Wake for Handle { fn wake(arc_self: Arc) { Wake::wake_by_ref(&arc_self) } /// Wake by reference fn wake_by_ref(arc_self: &Arc) { - arc_self.woken.store(true, Release); - arc_self.unpark.unpark(); + arc_self.shared.woken.store(true, Release); + arc_self.shared.unpark.unpark(); } } @@ -527,13 +529,15 @@ impl CoreGuard<'_> { fn block_on(self, future: F) -> F::Output { let ret = self.enter(|mut core, context| { let _enter = crate::runtime::enter(false); - let waker = context.spawner.waker_ref(); + let waker = Handle::waker_ref(&context.handle); let mut cx = std::task::Context::from_waker(&waker); pin!(future); 'outer: loop { - if core.spawner.reset_woken() { + let handle = &context.handle; + + if handle.reset_woken() { let (c, res) = context.enter(core, || { crate::coop::budget(|| future.as_mut().poll(&mut cx)) }); @@ -545,7 +549,7 @@ impl CoreGuard<'_> { } } - for _ in 0..core.spawner.shared.config.event_interval { + for _ in 0..handle.shared.config.event_interval { // Make sure we didn't hit an unhandled_panic if core.unhandled_panic { return (core, None); @@ -555,10 +559,10 @@ impl CoreGuard<'_> { let tick = core.tick; core.tick = core.tick.wrapping_add(1); - let entry = if tick % core.spawner.shared.config.global_queue_interval == 0 { - core.spawner.pop().or_else(|| core.tasks.pop_front()) + let entry = if tick % handle.shared.config.global_queue_interval == 0 { + handle.pop().or_else(|| core.tasks.pop_front()) } else { - core.tasks.pop_front().or_else(|| core.spawner.pop()) + core.tasks.pop_front().or_else(|| handle.pop()) }; let task = match entry { @@ -571,7 +575,7 @@ impl CoreGuard<'_> { } }; - let task = context.spawner.shared.owned.assert_owner(task); + let task = context.handle.shared.owned.assert_owner(task); let (c, _) = context.run_task(core, || { task.run(); diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index fc277782dbb..84714ef94a2 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -59,12 +59,11 @@ impl Handle { cfg_rt! { use crate::future::Future; + use crate::loom::sync::Arc; use crate::runtime::{blocking, task::Id}; use crate::task::JoinHandle; use crate::util::RngSeedGenerator; - use std::sync::Arc; - impl Handle { pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner { match self { @@ -81,7 +80,7 @@ cfg_rt! { F::Output: Send + 'static, { match self { - Handle::CurrentThread(h) => h.spawner.spawn(future, id), + Handle::CurrentThread(h) => current_thread::Handle::spawn(h, future, id), #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] Handle::MultiThread(h) => h.spawner.spawn(future, id),