Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: remove a reference to internal time handle #5107

Merged
merged 3 commits into from Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
80 changes: 49 additions & 31 deletions tokio/src/runtime/driver.rs
Expand Up @@ -58,16 +58,16 @@ impl Driver {
))
}

pub(crate) fn park(&mut self) {
self.inner.park()
pub(crate) fn park(&mut self, handle: &Handle) {
self.inner.park(handle)
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
self.inner.park_timeout(duration)
pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
self.inner.park_timeout(handle, duration)
}

pub(crate) fn shutdown(&mut self) {
self.inner.shutdown()
pub(crate) fn shutdown(&mut self, handle: &Handle) {
self.inner.shutdown(handle)
}
}

Expand All @@ -80,6 +80,18 @@ impl Handle {

self.io.unpark();
}

cfg_time! {
/// Returns a reference to the time driver handle.
///
/// Panics if no time driver is present.
#[track_caller]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this #[track_caller] actually reach all the way to the user anywhere? Can you update the panic location tests if necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The panic message is already tested here.

pub(crate) fn time(&self) -> &crate::runtime::time::Handle {
self.time
.as_ref()
.expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
}
}
}

// ===== io driver =====
Expand Down Expand Up @@ -121,30 +133,21 @@ cfg_io_driver! {
}

impl IoStack {
/*
pub(crate) fn handle(&self) -> IoHandle {
match self {
IoStack::Enabled(v) => IoHandle::Enabled(v.handle()),
IoStack::Disabled(v) => IoHandle::Disabled(v.unpark()),
}
}]
*/

pub(crate) fn park(&mut self) {
pub(crate) fn park(&mut self, _handle: &Handle) {
match self {
IoStack::Enabled(v) => v.park(),
IoStack::Disabled(v) => v.park(),
}
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
pub(crate) fn park_timeout(&mut self, _handle: &Handle, duration: Duration) {
match self {
IoStack::Enabled(v) => v.park_timeout(duration),
IoStack::Disabled(v) => v.park_timeout(duration),
}
}

pub(crate) fn shutdown(&mut self) {
pub(crate) fn shutdown(&mut self, _handle: &Handle) {
match self {
IoStack::Enabled(v) => v.shutdown(),
IoStack::Disabled(v) => v.shutdown(),
Expand Down Expand Up @@ -181,12 +184,28 @@ cfg_io_driver! {

cfg_not_io_driver! {
pub(crate) type IoHandle = UnparkThread;
pub(crate) type IoStack = ParkThread;

#[derive(Debug)]
pub(crate) struct IoStack(ParkThread);

fn create_io_stack(_enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
let park_thread = ParkThread::new();
let unpark_thread = park_thread.unpark();
Ok((park_thread, unpark_thread, Default::default()))
Ok((IoStack(park_thread), unpark_thread, Default::default()))
}

impl IoStack {
pub(crate) fn park(&mut self, _handle: &Handle) {
self.0.park();
}

pub(crate) fn park_timeout(&mut self, _handle: &Handle, duration: Duration) {
self.0.park_timeout(duration);
}

pub(crate) fn shutdown(&mut self, _handle: &Handle) {
self.0.shutdown();
}
}
}

Expand Down Expand Up @@ -249,7 +268,6 @@ cfg_time! {
pub(crate) enum TimeDriver {
Enabled {
driver: crate::runtime::time::Driver,
handle: crate::runtime::time::Handle,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the additional time handle ref to remove. Removing this gets rid of arc clones below.

},
Disabled(IoStack),
}
Expand All @@ -269,31 +287,31 @@ cfg_time! {
if enable {
let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock);

(TimeDriver::Enabled { driver, handle: handle.clone() }, Some(handle))
(TimeDriver::Enabled { driver }, Some(handle))
} else {
(TimeDriver::Disabled(io_stack), None)
}
}

impl TimeDriver {
pub(crate) fn park(&mut self) {
pub(crate) fn park(&mut self, handle: &Handle) {
match self {
TimeDriver::Enabled { driver, handle } => driver.park(handle),
TimeDriver::Disabled(v) => v.park(),
TimeDriver::Enabled { driver, .. } => driver.park(handle),
TimeDriver::Disabled(v) => v.park(handle),
}
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
match self {
TimeDriver::Enabled { driver, handle } => driver.park_timeout(handle, duration),
TimeDriver::Disabled(v) => v.park_timeout(duration),
TimeDriver::Enabled { driver } => driver.park_timeout(handle, duration),
TimeDriver::Disabled(v) => v.park_timeout(handle, duration),
}
}

pub(crate) fn shutdown(&mut self) {
pub(crate) fn shutdown(&mut self, handle: &Handle) {
match self {
TimeDriver::Enabled { driver, handle } => driver.shutdown(handle),
TimeDriver::Disabled(v) => v.shutdown(),
TimeDriver::Enabled { driver } => driver.shutdown(handle),
TimeDriver::Disabled(v) => v.shutdown(handle),
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/runtime/scheduler/current_thread.rs
Expand Up @@ -247,7 +247,7 @@ impl Drop for CurrentThread {

// Shutdown the resource drivers
if let Some(driver) = core.driver.as_mut() {
driver.shutdown();
driver.shutdown(&self.handle.driver);
}

(core, ())
Expand Down Expand Up @@ -314,7 +314,7 @@ impl Context {
core.metrics.submit(&self.handle.shared.worker_metrics);

let (c, _) = self.enter(core, || {
driver.park();
driver.park(&self.handle.driver);
});

core = c;
Expand All @@ -339,7 +339,7 @@ impl Context {

core.metrics.submit(&self.handle.shared.worker_metrics);
let (mut core, _) = self.enter(core, || {
driver.park_timeout(Duration::from_millis(0));
driver.park_timeout(&self.handle.driver, Duration::from_millis(0));
});

core.driver = Some(driver);
Expand Down
5 changes: 1 addition & 4 deletions tokio/src/runtime/scheduler/mod.rs
Expand Up @@ -43,10 +43,7 @@ impl Handle {
cfg_time! {
#[track_caller]
pub(crate) fn time(&self) -> &crate::runtime::time::Handle {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for this #[track_caller].

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The panic message is already tested here.

self.driver()
.time
.as_ref()
.expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
self.driver().time()
}

cfg_test_util! {
Expand Down
24 changes: 12 additions & 12 deletions tokio/src/runtime/scheduler/multi_thread/park.rs
Expand Up @@ -64,21 +64,21 @@ impl Parker {
}
}

pub(crate) fn park(&mut self) {
self.inner.park();
pub(crate) fn park(&mut self, handle: &driver::Handle) {
self.inner.park(handle);
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
// Only parking with zero is supported...
assert_eq!(duration, Duration::from_millis(0));

if let Some(mut driver) = self.inner.shared.driver.try_lock() {
driver.park_timeout(duration)
driver.park_timeout(handle, duration)
}
}

pub(crate) fn shutdown(&mut self) {
self.inner.shutdown();
pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
self.inner.shutdown(handle);
}
}

Expand All @@ -103,7 +103,7 @@ impl Unparker {

impl Inner {
/// Parks the current thread for at most `dur`.
fn park(&self) {
fn park(&self, handle: &driver::Handle) {
for _ in 0..3 {
// If we were previously notified then we consume this notification and
// return quickly.
Expand All @@ -119,7 +119,7 @@ impl Inner {
}

if let Some(mut driver) = self.shared.driver.try_lock() {
self.park_driver(&mut driver);
self.park_driver(&mut driver, handle);
} else {
self.park_condvar();
}
Expand Down Expand Up @@ -165,7 +165,7 @@ impl Inner {
}
}

fn park_driver(&self, driver: &mut Driver) {
fn park_driver(&self, driver: &mut Driver, handle: &driver::Handle) {
match self
.state
.compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
Expand All @@ -186,7 +186,7 @@ impl Inner {
Err(actual) => panic!("inconsistent park state; actual = {}", actual),
}

driver.park();
driver.park(handle);

match self.state.swap(EMPTY, SeqCst) {
NOTIFIED => {} // got a notification, hurray!
Expand Down Expand Up @@ -227,9 +227,9 @@ impl Inner {
self.condvar.notify_one()
}

fn shutdown(&self) {
fn shutdown(&self, handle: &driver::Handle) {
if let Some(mut driver) = self.shared.driver.try_lock() {
driver.shutdown();
driver.shutdown(handle);
}

self.condvar.notify_all();
Expand Down
10 changes: 5 additions & 5 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Expand Up @@ -522,9 +522,9 @@ impl Context {

// Park thread
if let Some(timeout) = duration {
park.park_timeout(timeout);
park.park_timeout(&self.worker.handle.driver, timeout);
} else {
park.park();
park.park(&self.worker.handle.driver);
}

// Remove `core` from context
Expand Down Expand Up @@ -687,14 +687,14 @@ impl Core {
}

/// Shuts down the core.
fn shutdown(&mut self) {
fn shutdown(&mut self, handle: &Handle) {
// Take the core
let mut park = self.park.take().expect("park missing");

// Drain the queue
while self.next_local_task().is_some() {}

park.shutdown();
park.shutdown(&handle.driver);
}
}

Expand Down Expand Up @@ -829,7 +829,7 @@ impl Handle {
debug_assert!(self.shared.owned.is_empty());

for mut core in cores.drain(..) {
core.shutdown();
core.shutdown(self);
}

// Drain the injection queue
Expand Down