Skip to content

Commit

Permalink
rt: remove a reference to internal time handle (#5107)
Browse files Browse the repository at this point in the history
This patch removes a handle to the internal runtime driver handle held
by the runtime. This is another step towards reducing the number of Arc
refs across the runtime internals. Specifically, this change is part of
an effort to remove an Arc in the time driver itself.
  • Loading branch information
carllerche committed Oct 17, 2022
1 parent 00082c6 commit a03e042
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 70 deletions.
80 changes: 49 additions & 31 deletions tokio/src/runtime/driver.rs
Expand Up @@ -58,16 +58,16 @@ impl Driver {
))
}

pub(crate) fn park(&mut self) {
self.inner.park()
pub(crate) fn park(&mut self, handle: &Handle) {
self.inner.park(handle)
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
self.inner.park_timeout(duration)
pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
self.inner.park_timeout(handle, duration)
}

pub(crate) fn shutdown(&mut self) {
self.inner.shutdown()
pub(crate) fn shutdown(&mut self, handle: &Handle) {
self.inner.shutdown(handle)
}
}

Expand All @@ -80,6 +80,18 @@ impl Handle {

self.io.unpark();
}

cfg_time! {
/// Returns a reference to the time driver handle.
///
/// Panics if no time driver is present.
#[track_caller]
pub(crate) fn time(&self) -> &crate::runtime::time::Handle {
self.time
.as_ref()
.expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
}
}
}

// ===== io driver =====
Expand Down Expand Up @@ -121,30 +133,21 @@ cfg_io_driver! {
}

impl IoStack {
/*
pub(crate) fn handle(&self) -> IoHandle {
match self {
IoStack::Enabled(v) => IoHandle::Enabled(v.handle()),
IoStack::Disabled(v) => IoHandle::Disabled(v.unpark()),
}
}]
*/

pub(crate) fn park(&mut self) {
pub(crate) fn park(&mut self, _handle: &Handle) {
match self {
IoStack::Enabled(v) => v.park(),
IoStack::Disabled(v) => v.park(),
}
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
pub(crate) fn park_timeout(&mut self, _handle: &Handle, duration: Duration) {
match self {
IoStack::Enabled(v) => v.park_timeout(duration),
IoStack::Disabled(v) => v.park_timeout(duration),
}
}

pub(crate) fn shutdown(&mut self) {
pub(crate) fn shutdown(&mut self, _handle: &Handle) {
match self {
IoStack::Enabled(v) => v.shutdown(),
IoStack::Disabled(v) => v.shutdown(),
Expand Down Expand Up @@ -181,12 +184,28 @@ cfg_io_driver! {

cfg_not_io_driver! {
pub(crate) type IoHandle = UnparkThread;
pub(crate) type IoStack = ParkThread;

#[derive(Debug)]
pub(crate) struct IoStack(ParkThread);

fn create_io_stack(_enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
let park_thread = ParkThread::new();
let unpark_thread = park_thread.unpark();
Ok((park_thread, unpark_thread, Default::default()))
Ok((IoStack(park_thread), unpark_thread, Default::default()))
}

impl IoStack {
pub(crate) fn park(&mut self, _handle: &Handle) {
self.0.park();
}

pub(crate) fn park_timeout(&mut self, _handle: &Handle, duration: Duration) {
self.0.park_timeout(duration);
}

pub(crate) fn shutdown(&mut self, _handle: &Handle) {
self.0.shutdown();
}
}
}

Expand Down Expand Up @@ -249,7 +268,6 @@ cfg_time! {
pub(crate) enum TimeDriver {
Enabled {
driver: crate::runtime::time::Driver,
handle: crate::runtime::time::Handle,
},
Disabled(IoStack),
}
Expand All @@ -269,31 +287,31 @@ cfg_time! {
if enable {
let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock);

(TimeDriver::Enabled { driver, handle: handle.clone() }, Some(handle))
(TimeDriver::Enabled { driver }, Some(handle))
} else {
(TimeDriver::Disabled(io_stack), None)
}
}

impl TimeDriver {
pub(crate) fn park(&mut self) {
pub(crate) fn park(&mut self, handle: &Handle) {
match self {
TimeDriver::Enabled { driver, handle } => driver.park(handle),
TimeDriver::Disabled(v) => v.park(),
TimeDriver::Enabled { driver, .. } => driver.park(handle),
TimeDriver::Disabled(v) => v.park(handle),
}
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
match self {
TimeDriver::Enabled { driver, handle } => driver.park_timeout(handle, duration),
TimeDriver::Disabled(v) => v.park_timeout(duration),
TimeDriver::Enabled { driver } => driver.park_timeout(handle, duration),
TimeDriver::Disabled(v) => v.park_timeout(handle, duration),
}
}

pub(crate) fn shutdown(&mut self) {
pub(crate) fn shutdown(&mut self, handle: &Handle) {
match self {
TimeDriver::Enabled { driver, handle } => driver.shutdown(handle),
TimeDriver::Disabled(v) => v.shutdown(),
TimeDriver::Enabled { driver } => driver.shutdown(handle),
TimeDriver::Disabled(v) => v.shutdown(handle),
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/runtime/scheduler/current_thread.rs
Expand Up @@ -247,7 +247,7 @@ impl Drop for CurrentThread {

// Shutdown the resource drivers
if let Some(driver) = core.driver.as_mut() {
driver.shutdown();
driver.shutdown(&self.handle.driver);
}

(core, ())
Expand Down Expand Up @@ -314,7 +314,7 @@ impl Context {
core.metrics.submit(&self.handle.shared.worker_metrics);

let (c, _) = self.enter(core, || {
driver.park();
driver.park(&self.handle.driver);
});

core = c;
Expand All @@ -339,7 +339,7 @@ impl Context {

core.metrics.submit(&self.handle.shared.worker_metrics);
let (mut core, _) = self.enter(core, || {
driver.park_timeout(Duration::from_millis(0));
driver.park_timeout(&self.handle.driver, Duration::from_millis(0));
});

core.driver = Some(driver);
Expand Down
5 changes: 1 addition & 4 deletions tokio/src/runtime/scheduler/mod.rs
Expand Up @@ -43,10 +43,7 @@ impl Handle {
cfg_time! {
#[track_caller]
pub(crate) fn time(&self) -> &crate::runtime::time::Handle {
self.driver()
.time
.as_ref()
.expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
self.driver().time()
}

cfg_test_util! {
Expand Down
24 changes: 12 additions & 12 deletions tokio/src/runtime/scheduler/multi_thread/park.rs
Expand Up @@ -64,21 +64,21 @@ impl Parker {
}
}

pub(crate) fn park(&mut self) {
self.inner.park();
pub(crate) fn park(&mut self, handle: &driver::Handle) {
self.inner.park(handle);
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
// Only parking with zero is supported...
assert_eq!(duration, Duration::from_millis(0));

if let Some(mut driver) = self.inner.shared.driver.try_lock() {
driver.park_timeout(duration)
driver.park_timeout(handle, duration)
}
}

pub(crate) fn shutdown(&mut self) {
self.inner.shutdown();
pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
self.inner.shutdown(handle);
}
}

Expand All @@ -103,7 +103,7 @@ impl Unparker {

impl Inner {
/// Parks the current thread for at most `dur`.
fn park(&self) {
fn park(&self, handle: &driver::Handle) {
for _ in 0..3 {
// If we were previously notified then we consume this notification and
// return quickly.
Expand All @@ -119,7 +119,7 @@ impl Inner {
}

if let Some(mut driver) = self.shared.driver.try_lock() {
self.park_driver(&mut driver);
self.park_driver(&mut driver, handle);
} else {
self.park_condvar();
}
Expand Down Expand Up @@ -165,7 +165,7 @@ impl Inner {
}
}

fn park_driver(&self, driver: &mut Driver) {
fn park_driver(&self, driver: &mut Driver, handle: &driver::Handle) {
match self
.state
.compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
Expand All @@ -186,7 +186,7 @@ impl Inner {
Err(actual) => panic!("inconsistent park state; actual = {}", actual),
}

driver.park();
driver.park(handle);

match self.state.swap(EMPTY, SeqCst) {
NOTIFIED => {} // got a notification, hurray!
Expand Down Expand Up @@ -227,9 +227,9 @@ impl Inner {
self.condvar.notify_one()
}

fn shutdown(&self) {
fn shutdown(&self, handle: &driver::Handle) {
if let Some(mut driver) = self.shared.driver.try_lock() {
driver.shutdown();
driver.shutdown(handle);
}

self.condvar.notify_all();
Expand Down
10 changes: 5 additions & 5 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Expand Up @@ -522,9 +522,9 @@ impl Context {

// Park thread
if let Some(timeout) = duration {
park.park_timeout(timeout);
park.park_timeout(&self.worker.handle.driver, timeout);
} else {
park.park();
park.park(&self.worker.handle.driver);
}

// Remove `core` from context
Expand Down Expand Up @@ -687,14 +687,14 @@ impl Core {
}

/// Shuts down the core.
fn shutdown(&mut self) {
fn shutdown(&mut self, handle: &Handle) {
// Take the core
let mut park = self.park.take().expect("park missing");

// Drain the queue
while self.next_local_task().is_some() {}

park.shutdown();
park.shutdown(&handle.driver);
}
}

Expand Down Expand Up @@ -829,7 +829,7 @@ impl Handle {
debug_assert!(self.shared.owned.is_empty());

for mut core in cores.drain(..) {
core.shutdown();
core.shutdown(self);
}

// Drain the injection queue
Expand Down

0 comments on commit a03e042

Please sign in to comment.