diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index 74d9009240a..6430edd52e1 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -58,16 +58,16 @@ impl Driver { )) } - pub(crate) fn park(&mut self) { - self.inner.park() + pub(crate) fn park(&mut self, handle: &Handle) { + self.inner.park(handle) } - pub(crate) fn park_timeout(&mut self, duration: Duration) { - self.inner.park_timeout(duration) + pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) { + self.inner.park_timeout(handle, duration) } - pub(crate) fn shutdown(&mut self) { - self.inner.shutdown() + pub(crate) fn shutdown(&mut self, handle: &Handle) { + self.inner.shutdown(handle) } } @@ -80,6 +80,18 @@ impl Handle { self.io.unpark(); } + + cfg_time! { + /// Returns a reference to the time driver handle. + /// + /// Panics if no time driver is present. + #[track_caller] + pub(crate) fn time(&self) -> &crate::runtime::time::Handle { + self.time + .as_ref() + .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.") + } + } } // ===== io driver ===== @@ -121,30 +133,21 @@ cfg_io_driver! { } impl IoStack { - /* - pub(crate) fn handle(&self) -> IoHandle { - match self { - IoStack::Enabled(v) => IoHandle::Enabled(v.handle()), - IoStack::Disabled(v) => IoHandle::Disabled(v.unpark()), - } - }] - */ - - pub(crate) fn park(&mut self) { + pub(crate) fn park(&mut self, _handle: &Handle) { match self { IoStack::Enabled(v) => v.park(), IoStack::Disabled(v) => v.park(), } } - pub(crate) fn park_timeout(&mut self, duration: Duration) { + pub(crate) fn park_timeout(&mut self, _handle: &Handle, duration: Duration) { match self { IoStack::Enabled(v) => v.park_timeout(duration), IoStack::Disabled(v) => v.park_timeout(duration), } } - pub(crate) fn shutdown(&mut self) { + pub(crate) fn shutdown(&mut self, _handle: &Handle) { match self { IoStack::Enabled(v) => v.shutdown(), IoStack::Disabled(v) => v.shutdown(), @@ -181,12 +184,28 @@ cfg_io_driver! { cfg_not_io_driver! { pub(crate) type IoHandle = UnparkThread; - pub(crate) type IoStack = ParkThread; + + #[derive(Debug)] + pub(crate) struct IoStack(ParkThread); fn create_io_stack(_enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> { let park_thread = ParkThread::new(); let unpark_thread = park_thread.unpark(); - Ok((park_thread, unpark_thread, Default::default())) + Ok((IoStack(park_thread), unpark_thread, Default::default())) + } + + impl IoStack { + pub(crate) fn park(&mut self, _handle: &Handle) { + self.0.park(); + } + + pub(crate) fn park_timeout(&mut self, _handle: &Handle, duration: Duration) { + self.0.park_timeout(duration); + } + + pub(crate) fn shutdown(&mut self, _handle: &Handle) { + self.0.shutdown(); + } } } @@ -249,7 +268,6 @@ cfg_time! { pub(crate) enum TimeDriver { Enabled { driver: crate::runtime::time::Driver, - handle: crate::runtime::time::Handle, }, Disabled(IoStack), } @@ -269,31 +287,31 @@ cfg_time! { if enable { let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock); - (TimeDriver::Enabled { driver, handle: handle.clone() }, Some(handle)) + (TimeDriver::Enabled { driver }, Some(handle)) } else { (TimeDriver::Disabled(io_stack), None) } } impl TimeDriver { - pub(crate) fn park(&mut self) { + pub(crate) fn park(&mut self, handle: &Handle) { match self { - TimeDriver::Enabled { driver, handle } => driver.park(handle), - TimeDriver::Disabled(v) => v.park(), + TimeDriver::Enabled { driver, .. } => driver.park(handle), + TimeDriver::Disabled(v) => v.park(handle), } } - pub(crate) fn park_timeout(&mut self, duration: Duration) { + pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) { match self { - TimeDriver::Enabled { driver, handle } => driver.park_timeout(handle, duration), - TimeDriver::Disabled(v) => v.park_timeout(duration), + TimeDriver::Enabled { driver } => driver.park_timeout(handle, duration), + TimeDriver::Disabled(v) => v.park_timeout(handle, duration), } } - pub(crate) fn shutdown(&mut self) { + pub(crate) fn shutdown(&mut self, handle: &Handle) { match self { - TimeDriver::Enabled { driver, handle } => driver.shutdown(handle), - TimeDriver::Disabled(v) => v.shutdown(), + TimeDriver::Enabled { driver } => driver.shutdown(handle), + TimeDriver::Disabled(v) => v.shutdown(handle), } } } diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index 666be6b13f2..d11d93253ad 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -247,7 +247,7 @@ impl Drop for CurrentThread { // Shutdown the resource drivers if let Some(driver) = core.driver.as_mut() { - driver.shutdown(); + driver.shutdown(&self.handle.driver); } (core, ()) @@ -314,7 +314,7 @@ impl Context { core.metrics.submit(&self.handle.shared.worker_metrics); let (c, _) = self.enter(core, || { - driver.park(); + driver.park(&self.handle.driver); }); core = c; @@ -339,7 +339,7 @@ impl Context { core.metrics.submit(&self.handle.shared.worker_metrics); let (mut core, _) = self.enter(core, || { - driver.park_timeout(Duration::from_millis(0)); + driver.park_timeout(&self.handle.driver, Duration::from_millis(0)); }); core.driver = Some(driver); diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index e214086d4d0..0f7dae49ece 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -43,10 +43,7 @@ impl Handle { cfg_time! { #[track_caller] pub(crate) fn time(&self) -> &crate::runtime::time::Handle { - self.driver() - .time - .as_ref() - .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.") + self.driver().time() } cfg_test_util! { diff --git a/tokio/src/runtime/scheduler/multi_thread/park.rs b/tokio/src/runtime/scheduler/multi_thread/park.rs index 46432f4f036..6bdbff961e3 100644 --- a/tokio/src/runtime/scheduler/multi_thread/park.rs +++ b/tokio/src/runtime/scheduler/multi_thread/park.rs @@ -64,21 +64,21 @@ impl Parker { } } - pub(crate) fn park(&mut self) { - self.inner.park(); + pub(crate) fn park(&mut self, handle: &driver::Handle) { + self.inner.park(handle); } - pub(crate) fn park_timeout(&mut self, duration: Duration) { + pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) { // Only parking with zero is supported... assert_eq!(duration, Duration::from_millis(0)); if let Some(mut driver) = self.inner.shared.driver.try_lock() { - driver.park_timeout(duration) + driver.park_timeout(handle, duration) } } - pub(crate) fn shutdown(&mut self) { - self.inner.shutdown(); + pub(crate) fn shutdown(&mut self, handle: &driver::Handle) { + self.inner.shutdown(handle); } } @@ -103,7 +103,7 @@ impl Unparker { impl Inner { /// Parks the current thread for at most `dur`. - fn park(&self) { + fn park(&self, handle: &driver::Handle) { for _ in 0..3 { // If we were previously notified then we consume this notification and // return quickly. @@ -119,7 +119,7 @@ impl Inner { } if let Some(mut driver) = self.shared.driver.try_lock() { - self.park_driver(&mut driver); + self.park_driver(&mut driver, handle); } else { self.park_condvar(); } @@ -165,7 +165,7 @@ impl Inner { } } - fn park_driver(&self, driver: &mut Driver) { + fn park_driver(&self, driver: &mut Driver, handle: &driver::Handle) { match self .state .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst) @@ -186,7 +186,7 @@ impl Inner { Err(actual) => panic!("inconsistent park state; actual = {}", actual), } - driver.park(); + driver.park(handle); match self.state.swap(EMPTY, SeqCst) { NOTIFIED => {} // got a notification, hurray! @@ -227,9 +227,9 @@ impl Inner { self.condvar.notify_one() } - fn shutdown(&self) { + fn shutdown(&self, handle: &driver::Handle) { if let Some(mut driver) = self.shared.driver.try_lock() { - driver.shutdown(); + driver.shutdown(handle); } self.condvar.notify_all(); diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 34ef0d9f126..7a7e676dcc9 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -522,9 +522,9 @@ impl Context { // Park thread if let Some(timeout) = duration { - park.park_timeout(timeout); + park.park_timeout(&self.worker.handle.driver, timeout); } else { - park.park(); + park.park(&self.worker.handle.driver); } // Remove `core` from context @@ -687,14 +687,14 @@ impl Core { } /// Shuts down the core. - fn shutdown(&mut self) { + fn shutdown(&mut self, handle: &Handle) { // Take the core let mut park = self.park.take().expect("park missing"); // Drain the queue while self.next_local_task().is_some() {} - park.shutdown(); + park.shutdown(&handle.driver); } } @@ -829,7 +829,7 @@ impl Handle { debug_assert!(self.shared.owned.is_empty()); for mut core in cores.drain(..) { - core.shutdown(); + core.shutdown(self); } // Drain the injection queue diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index ec4333bf8df..a935f8cbb42 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -20,7 +20,7 @@ mod wheel; use crate::loom::sync::atomic::{AtomicBool, Ordering}; use crate::loom::sync::{Arc, Mutex}; -use crate::runtime::driver::{IoHandle, IoStack}; +use crate::runtime::driver::{self, IoHandle, IoStack}; use crate::time::error::Error; use crate::time::{Clock, Duration}; @@ -165,15 +165,17 @@ impl Driver { (driver, handle) } - pub(crate) fn park(&mut self, handle: &Handle) { + pub(crate) fn park(&mut self, handle: &driver::Handle) { self.park_internal(handle, None) } - pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) { + pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) { self.park_internal(handle, Some(duration)) } - pub(crate) fn shutdown(&mut self, handle: &Handle) { + pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) { + let handle = rt_handle.time(); + if handle.is_shutdown() { return; } @@ -184,10 +186,11 @@ impl Driver { handle.process_at_time(u64::MAX); - self.park.shutdown(); + self.park.shutdown(rt_handle); } - fn park_internal(&mut self, handle: &Handle, limit: Option) { + fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option) { + let handle = rt_handle.time(); let mut lock = handle.get().state.lock(); assert!(!handle.is_shutdown()); @@ -211,16 +214,16 @@ impl Driver { duration = std::cmp::min(limit, duration); } - self.park_thread_timeout(duration); + self.park_thread_timeout(rt_handle, duration); } else { - self.park.park_timeout(Duration::from_secs(0)); + self.park.park_timeout(rt_handle, Duration::from_secs(0)); } } None => { if let Some(duration) = limit { - self.park_thread_timeout(duration); + self.park_thread_timeout(rt_handle, duration); } else { - self.park.park(); + self.park.park(rt_handle); } } } @@ -230,11 +233,11 @@ impl Driver { } cfg_test_util! { - fn park_thread_timeout(&mut self, duration: Duration) { + fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) { let clock = &self.time_source.clock; if clock.is_paused() { - self.park.park_timeout(Duration::from_secs(0)); + self.park.park_timeout(rt_handle, Duration::from_secs(0)); // If the time driver was woken, then the park completed // before the "duration" elapsed (usually caused by a @@ -245,7 +248,7 @@ impl Driver { clock.advance(duration); } } else { - self.park.park_timeout(duration); + self.park.park_timeout(rt_handle, duration); } } @@ -255,8 +258,8 @@ impl Driver { } cfg_not_test_util! { - fn park_thread_timeout(&mut self, duration: Duration) { - self.park.park_timeout(duration); + fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) { + self.park.park_timeout(rt_handle, duration); } } }