Skip to content

Commit

Permalink
rt: remove a reference to internal time handle
Browse files Browse the repository at this point in the history
This patch removes a handle to the internal runtime driver handle held
by the runtime. This is another step towards reducing the number of Arc
refs across the runtime internals. Specifically, this change is part of
an effort to remove an Arc in the time driver itself.
  • Loading branch information
carllerche committed Oct 14, 2022
1 parent bf5eed8 commit 2e0e021
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 43 deletions.
48 changes: 25 additions & 23 deletions tokio/src/runtime/driver.rs
Expand Up @@ -58,16 +58,16 @@ impl Driver {
))
}

pub(crate) fn park(&mut self) {
self.inner.park()
pub(crate) fn park(&mut self, handle: &Handle) {
self.inner.park(handle)
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
self.inner.park_timeout(duration)
pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
self.inner.park_timeout(handle, duration)
}

pub(crate) fn shutdown(&mut self) {
self.inner.shutdown()
pub(crate) fn shutdown(&mut self, handle: &Handle) {
self.inner.shutdown(handle)
}
}

Expand Down Expand Up @@ -121,15 +121,6 @@ cfg_io_driver! {
}

impl IoStack {
/*
pub(crate) fn handle(&self) -> IoHandle {
match self {
IoStack::Enabled(v) => IoHandle::Enabled(v.handle()),
IoStack::Disabled(v) => IoHandle::Disabled(v.unpark()),
}
}]
*/

pub(crate) fn park(&mut self) {
match self {
IoStack::Enabled(v) => v.park(),
Expand Down Expand Up @@ -249,7 +240,6 @@ cfg_time! {
pub(crate) enum TimeDriver {
Enabled {
driver: crate::runtime::time::Driver,
handle: crate::runtime::time::Handle,
},
Disabled(IoStack),
}
Expand All @@ -269,30 +259,42 @@ cfg_time! {
if enable {
let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock);

(TimeDriver::Enabled { driver, handle: handle.clone() }, Some(handle))
(TimeDriver::Enabled { driver }, Some(handle))
} else {
(TimeDriver::Disabled(io_stack), None)
}
}

impl TimeDriver {
pub(crate) fn park(&mut self) {
pub(crate) fn park(&mut self, handle: &Handle) {
match self {
TimeDriver::Enabled { driver, handle } => driver.park(handle),
TimeDriver::Enabled { driver, .. } => {
// If the time driver is enabled, a handle is set.
let handle = handle.time.as_ref().unwrap();
driver.park(handle)
}
TimeDriver::Disabled(v) => v.park(),
}
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
match self {
TimeDriver::Enabled { driver, handle } => driver.park_timeout(handle, duration),
TimeDriver::Enabled { driver } => {
// If the time driver is enabled, a handle is set.
let handle = handle.time.as_ref().unwrap();
driver.park_timeout(handle, duration)
}
TimeDriver::Disabled(v) => v.park_timeout(duration),
}
}

pub(crate) fn shutdown(&mut self) {
pub(crate) fn shutdown(&mut self, handle: &Handle) {
match self {
TimeDriver::Enabled { driver, handle } => driver.shutdown(handle),
TimeDriver::Enabled { driver } => {
// If the time driver is enabled, a handle is set.
let handle = handle.time.as_ref().unwrap();
driver.shutdown(handle)
}
TimeDriver::Disabled(v) => v.shutdown(),
}
}
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/runtime/scheduler/current_thread.rs
Expand Up @@ -247,7 +247,7 @@ impl Drop for CurrentThread {

// Shutdown the resource drivers
if let Some(driver) = core.driver.as_mut() {
driver.shutdown();
driver.shutdown(&self.handle.driver);
}

(core, ())
Expand Down Expand Up @@ -314,7 +314,7 @@ impl Context {
core.metrics.submit(&self.handle.shared.worker_metrics);

let (c, _) = self.enter(core, || {
driver.park();
driver.park(&self.handle.driver);
});

core = c;
Expand All @@ -339,7 +339,7 @@ impl Context {

core.metrics.submit(&self.handle.shared.worker_metrics);
let (mut core, _) = self.enter(core, || {
driver.park_timeout(Duration::from_millis(0));
driver.park_timeout(&self.handle.driver, Duration::from_millis(0));
});

core.driver = Some(driver);
Expand Down
24 changes: 12 additions & 12 deletions tokio/src/runtime/scheduler/multi_thread/park.rs
Expand Up @@ -64,21 +64,21 @@ impl Parker {
}
}

pub(crate) fn park(&mut self) {
self.inner.park();
pub(crate) fn park(&mut self, handle: &driver::Handle) {
self.inner.park(handle);
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
// Only parking with zero is supported...
assert_eq!(duration, Duration::from_millis(0));

if let Some(mut driver) = self.inner.shared.driver.try_lock() {
driver.park_timeout(duration)
driver.park_timeout(handle, duration)
}
}

pub(crate) fn shutdown(&mut self) {
self.inner.shutdown();
pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
self.inner.shutdown(handle);
}
}

Expand All @@ -103,7 +103,7 @@ impl Unparker {

impl Inner {
/// Parks the current thread for at most `dur`.
fn park(&self) {
fn park(&self, handle: &driver::Handle) {
for _ in 0..3 {
// If we were previously notified then we consume this notification and
// return quickly.
Expand All @@ -119,7 +119,7 @@ impl Inner {
}

if let Some(mut driver) = self.shared.driver.try_lock() {
self.park_driver(&mut driver);
self.park_driver(&mut driver, handle);
} else {
self.park_condvar();
}
Expand Down Expand Up @@ -165,7 +165,7 @@ impl Inner {
}
}

fn park_driver(&self, driver: &mut Driver) {
fn park_driver(&self, driver: &mut Driver, handle: &driver::Handle) {
match self
.state
.compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
Expand All @@ -186,7 +186,7 @@ impl Inner {
Err(actual) => panic!("inconsistent park state; actual = {}", actual),
}

driver.park();
driver.park(handle);

match self.state.swap(EMPTY, SeqCst) {
NOTIFIED => {} // got a notification, hurray!
Expand Down Expand Up @@ -227,9 +227,9 @@ impl Inner {
self.condvar.notify_one()
}

fn shutdown(&self) {
fn shutdown(&self, handle: &driver::Handle) {
if let Some(mut driver) = self.shared.driver.try_lock() {
driver.shutdown();
driver.shutdown(handle);
}

self.condvar.notify_all();
Expand Down
10 changes: 5 additions & 5 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Expand Up @@ -522,9 +522,9 @@ impl Context {

// Park thread
if let Some(timeout) = duration {
park.park_timeout(timeout);
park.park_timeout(&self.worker.handle.driver, timeout);
} else {
park.park();
park.park(&self.worker.handle.driver);
}

// Remove `core` from context
Expand Down Expand Up @@ -687,14 +687,14 @@ impl Core {
}

/// Shuts down the core.
fn shutdown(&mut self) {
fn shutdown(&mut self, handle: &Handle) {
// Take the core
let mut park = self.park.take().expect("park missing");

// Drain the queue
while self.next_local_task().is_some() {}

park.shutdown();
park.shutdown(&handle.driver);
}
}

Expand Down Expand Up @@ -829,7 +829,7 @@ impl Handle {
debug_assert!(self.shared.owned.is_empty());

for mut core in cores.drain(..) {
core.shutdown();
core.shutdown(self);
}

// Drain the injection queue
Expand Down

0 comments on commit 2e0e021

Please sign in to comment.