From 7d72622f2f4a9b21807dd360871c2d2d633feab2 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 14 Oct 2022 14:04:48 -0700 Subject: [PATCH] rt: remove a reference to internal time handle This patch removes a handle to the internal runtime driver handle held by the runtime. This is another step towards reducing the number of Arc refs across the runtime internals. Specifically, this change is part of an effort to remove an Arc in the time driver itself. --- tokio/src/runtime/driver.rs | 80 ++++++++++++------- tokio/src/runtime/scheduler/current_thread.rs | 6 +- tokio/src/runtime/scheduler/mod.rs | 5 +- .../runtime/scheduler/multi_thread/park.rs | 24 +++--- .../runtime/scheduler/multi_thread/worker.rs | 10 +-- tokio/src/runtime/time/mod.rs | 33 ++++---- 6 files changed, 88 insertions(+), 70 deletions(-) diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index 74d9009240a..37aab793bcf 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -58,16 +58,16 @@ impl Driver { )) } - pub(crate) fn park(&mut self) { - self.inner.park() + pub(crate) fn park(&mut self, handle: &Handle) { + self.inner.park(handle) } - pub(crate) fn park_timeout(&mut self, duration: Duration) { - self.inner.park_timeout(duration) + pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) { + self.inner.park_timeout(handle, duration) } - pub(crate) fn shutdown(&mut self) { - self.inner.shutdown() + pub(crate) fn shutdown(&mut self, handle: &Handle) { + self.inner.shutdown(handle) } } @@ -80,6 +80,18 @@ impl Handle { self.io.unpark(); } + + cfg_time! { + /// Returns a reference to the time driver handle. + /// + /// Panics if no time driver is present + #[track_caller] + pub(crate) fn time(&self) -> &crate::runtime::time::Handle { + self.time + .as_ref() + .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.") + } + } } // ===== io driver ===== @@ -121,30 +133,21 @@ cfg_io_driver! { } impl IoStack { - /* - pub(crate) fn handle(&self) -> IoHandle { - match self { - IoStack::Enabled(v) => IoHandle::Enabled(v.handle()), - IoStack::Disabled(v) => IoHandle::Disabled(v.unpark()), - } - }] - */ - - pub(crate) fn park(&mut self) { + pub(crate) fn park(&mut self, _handle: &Handle) { match self { IoStack::Enabled(v) => v.park(), IoStack::Disabled(v) => v.park(), } } - pub(crate) fn park_timeout(&mut self, duration: Duration) { + pub(crate) fn park_timeout(&mut self, _handle: &Handle, duration: Duration) { match self { IoStack::Enabled(v) => v.park_timeout(duration), IoStack::Disabled(v) => v.park_timeout(duration), } } - pub(crate) fn shutdown(&mut self) { + pub(crate) fn shutdown(&mut self, _handle: &Handle) { match self { IoStack::Enabled(v) => v.shutdown(), IoStack::Disabled(v) => v.shutdown(), @@ -181,12 +184,28 @@ cfg_io_driver! { cfg_not_io_driver! { pub(crate) type IoHandle = UnparkThread; - pub(crate) type IoStack = ParkThread; + + #[derive(Debug)] + pub(crate) struct IoStack(ParkThread); fn create_io_stack(_enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> { let park_thread = ParkThread::new(); let unpark_thread = park_thread.unpark(); - Ok((park_thread, unpark_thread, Default::default())) + Ok((IoStack(park_thread), unpark_thread, Default::default())) + } + + impl IoStack { + pub(crate) fn park(&mut self, _handle: &Handle) { + self.0.park(); + } + + pub(crate) fn park_timeout(&mut self, _handle: &Handle, duration: Duration) { + self.0.park_timeout(duration); + } + + pub(crate) fn shutdown(&mut self, _handle: &Handle) { + self.0.shutdown(); + } } } @@ -249,7 +268,6 @@ cfg_time! { pub(crate) enum TimeDriver { Enabled { driver: crate::runtime::time::Driver, - handle: crate::runtime::time::Handle, }, Disabled(IoStack), } @@ -269,31 +287,31 @@ cfg_time! { if enable { let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock); - (TimeDriver::Enabled { driver, handle: handle.clone() }, Some(handle)) + (TimeDriver::Enabled { driver }, Some(handle)) } else { (TimeDriver::Disabled(io_stack), None) } } impl TimeDriver { - pub(crate) fn park(&mut self) { + pub(crate) fn park(&mut self, handle: &Handle) { match self { - TimeDriver::Enabled { driver, handle } => driver.park(handle), - TimeDriver::Disabled(v) => v.park(), + TimeDriver::Enabled { driver, .. } => driver.park(handle), + TimeDriver::Disabled(v) => v.park(handle), } } - pub(crate) fn park_timeout(&mut self, duration: Duration) { + pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) { match self { - TimeDriver::Enabled { driver, handle } => driver.park_timeout(handle, duration), - TimeDriver::Disabled(v) => v.park_timeout(duration), + TimeDriver::Enabled { driver } => driver.park_timeout(handle, duration), + TimeDriver::Disabled(v) => v.park_timeout(handle, duration), } } - pub(crate) fn shutdown(&mut self) { + pub(crate) fn shutdown(&mut self, handle: &Handle) { match self { - TimeDriver::Enabled { driver, handle } => driver.shutdown(handle), - TimeDriver::Disabled(v) => v.shutdown(), + TimeDriver::Enabled { driver } => driver.shutdown(handle), + TimeDriver::Disabled(v) => v.shutdown(handle), } } } diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index 666be6b13f2..d11d93253ad 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -247,7 +247,7 @@ impl Drop for CurrentThread { // Shutdown the resource drivers if let Some(driver) = core.driver.as_mut() { - driver.shutdown(); + driver.shutdown(&self.handle.driver); } (core, ()) @@ -314,7 +314,7 @@ impl Context { core.metrics.submit(&self.handle.shared.worker_metrics); let (c, _) = self.enter(core, || { - driver.park(); + driver.park(&self.handle.driver); }); core = c; @@ -339,7 +339,7 @@ impl Context { core.metrics.submit(&self.handle.shared.worker_metrics); let (mut core, _) = self.enter(core, || { - driver.park_timeout(Duration::from_millis(0)); + driver.park_timeout(&self.handle.driver, Duration::from_millis(0)); }); core.driver = Some(driver); diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index e214086d4d0..0f7dae49ece 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -43,10 +43,7 @@ impl Handle { cfg_time! { #[track_caller] pub(crate) fn time(&self) -> &crate::runtime::time::Handle { - self.driver() - .time - .as_ref() - .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.") + self.driver().time() } cfg_test_util! { diff --git a/tokio/src/runtime/scheduler/multi_thread/park.rs b/tokio/src/runtime/scheduler/multi_thread/park.rs index 46432f4f036..6bdbff961e3 100644 --- a/tokio/src/runtime/scheduler/multi_thread/park.rs +++ b/tokio/src/runtime/scheduler/multi_thread/park.rs @@ -64,21 +64,21 @@ impl Parker { } } - pub(crate) fn park(&mut self) { - self.inner.park(); + pub(crate) fn park(&mut self, handle: &driver::Handle) { + self.inner.park(handle); } - pub(crate) fn park_timeout(&mut self, duration: Duration) { + pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) { // Only parking with zero is supported... assert_eq!(duration, Duration::from_millis(0)); if let Some(mut driver) = self.inner.shared.driver.try_lock() { - driver.park_timeout(duration) + driver.park_timeout(handle, duration) } } - pub(crate) fn shutdown(&mut self) { - self.inner.shutdown(); + pub(crate) fn shutdown(&mut self, handle: &driver::Handle) { + self.inner.shutdown(handle); } } @@ -103,7 +103,7 @@ impl Unparker { impl Inner { /// Parks the current thread for at most `dur`. - fn park(&self) { + fn park(&self, handle: &driver::Handle) { for _ in 0..3 { // If we were previously notified then we consume this notification and // return quickly. @@ -119,7 +119,7 @@ impl Inner { } if let Some(mut driver) = self.shared.driver.try_lock() { - self.park_driver(&mut driver); + self.park_driver(&mut driver, handle); } else { self.park_condvar(); } @@ -165,7 +165,7 @@ impl Inner { } } - fn park_driver(&self, driver: &mut Driver) { + fn park_driver(&self, driver: &mut Driver, handle: &driver::Handle) { match self .state .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst) @@ -186,7 +186,7 @@ impl Inner { Err(actual) => panic!("inconsistent park state; actual = {}", actual), } - driver.park(); + driver.park(handle); match self.state.swap(EMPTY, SeqCst) { NOTIFIED => {} // got a notification, hurray! @@ -227,9 +227,9 @@ impl Inner { self.condvar.notify_one() } - fn shutdown(&self) { + fn shutdown(&self, handle: &driver::Handle) { if let Some(mut driver) = self.shared.driver.try_lock() { - driver.shutdown(); + driver.shutdown(handle); } self.condvar.notify_all(); diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 34ef0d9f126..7a7e676dcc9 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -522,9 +522,9 @@ impl Context { // Park thread if let Some(timeout) = duration { - park.park_timeout(timeout); + park.park_timeout(&self.worker.handle.driver, timeout); } else { - park.park(); + park.park(&self.worker.handle.driver); } // Remove `core` from context @@ -687,14 +687,14 @@ impl Core { } /// Shuts down the core. - fn shutdown(&mut self) { + fn shutdown(&mut self, handle: &Handle) { // Take the core let mut park = self.park.take().expect("park missing"); // Drain the queue while self.next_local_task().is_some() {} - park.shutdown(); + park.shutdown(&handle.driver); } } @@ -829,7 +829,7 @@ impl Handle { debug_assert!(self.shared.owned.is_empty()); for mut core in cores.drain(..) { - core.shutdown(); + core.shutdown(self); } // Drain the injection queue diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index ec4333bf8df..a935f8cbb42 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -20,7 +20,7 @@ mod wheel; use crate::loom::sync::atomic::{AtomicBool, Ordering}; use crate::loom::sync::{Arc, Mutex}; -use crate::runtime::driver::{IoHandle, IoStack}; +use crate::runtime::driver::{self, IoHandle, IoStack}; use crate::time::error::Error; use crate::time::{Clock, Duration}; @@ -165,15 +165,17 @@ impl Driver { (driver, handle) } - pub(crate) fn park(&mut self, handle: &Handle) { + pub(crate) fn park(&mut self, handle: &driver::Handle) { self.park_internal(handle, None) } - pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) { + pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) { self.park_internal(handle, Some(duration)) } - pub(crate) fn shutdown(&mut self, handle: &Handle) { + pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) { + let handle = rt_handle.time(); + if handle.is_shutdown() { return; } @@ -184,10 +186,11 @@ impl Driver { handle.process_at_time(u64::MAX); - self.park.shutdown(); + self.park.shutdown(rt_handle); } - fn park_internal(&mut self, handle: &Handle, limit: Option) { + fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option) { + let handle = rt_handle.time(); let mut lock = handle.get().state.lock(); assert!(!handle.is_shutdown()); @@ -211,16 +214,16 @@ impl Driver { duration = std::cmp::min(limit, duration); } - self.park_thread_timeout(duration); + self.park_thread_timeout(rt_handle, duration); } else { - self.park.park_timeout(Duration::from_secs(0)); + self.park.park_timeout(rt_handle, Duration::from_secs(0)); } } None => { if let Some(duration) = limit { - self.park_thread_timeout(duration); + self.park_thread_timeout(rt_handle, duration); } else { - self.park.park(); + self.park.park(rt_handle); } } } @@ -230,11 +233,11 @@ impl Driver { } cfg_test_util! { - fn park_thread_timeout(&mut self, duration: Duration) { + fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) { let clock = &self.time_source.clock; if clock.is_paused() { - self.park.park_timeout(Duration::from_secs(0)); + self.park.park_timeout(rt_handle, Duration::from_secs(0)); // If the time driver was woken, then the park completed // before the "duration" elapsed (usually caused by a @@ -245,7 +248,7 @@ impl Driver { clock.advance(duration); } } else { - self.park.park_timeout(duration); + self.park.park_timeout(rt_handle, duration); } } @@ -255,8 +258,8 @@ impl Driver { } cfg_not_test_util! { - fn park_thread_timeout(&mut self, duration: Duration) { - self.park.park_timeout(duration); + fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) { + self.park.park_timeout(rt_handle, duration); } } }