Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: Notify another worker if unparked from driver #6245

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 13 additions & 6 deletions tokio/src/runtime/scheduler/multi_thread/park.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,21 @@ impl Parker {
}
}

pub(crate) fn park(&mut self, handle: &driver::Handle) {
self.inner.park(handle);
/// Returns whether this worker has obtained driver
pub(crate) fn park(&mut self, handle: &driver::Handle) -> bool {
self.inner.park(handle)
}

pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
/// Returns whether this worker has obtained driver
pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) -> bool {
// Only parking with zero is supported...
assert_eq!(duration, Duration::from_millis(0));

if let Some(mut driver) = self.inner.shared.driver.try_lock() {
driver.park_timeout(handle, duration);
true
} else {
false
}
}

Expand Down Expand Up @@ -101,22 +106,24 @@ impl Unparker {
}

impl Inner {
/// Parks the current thread for at most `dur`.
fn park(&self, handle: &driver::Handle) {
/// Returns whether this worker has obtained driver.
fn park(&self, handle: &driver::Handle) -> bool {
// If we were previously notified then we consume this notification and
// return quickly.
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
return;
return false;
}

if let Some(mut driver) = self.shared.driver.try_lock() {
self.park_driver(&mut driver, handle);
true
} else {
self.park_condvar();
false
}
}

Expand Down
26 changes: 24 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ struct Core {
/// involves attempting to steal from other workers.
is_searching: bool,

/// True if the worker has just acquired driver lock. If so, the core
/// attempts to notify another worker to park on the io driver before running
/// tasks if there is no other searching worker.
did_acquire_driver_lock: bool,

/// True if the scheduler is being shutdown
is_shutdown: bool,

Expand Down Expand Up @@ -266,6 +271,7 @@ pub(super) fn create(
lifo_enabled: !config.disable_lifo_slot,
run_queue,
is_searching: false,
did_acquire_driver_lock: false,
is_shutdown: false,
is_traced: false,
park: Some(park),
Expand Down Expand Up @@ -560,6 +566,10 @@ impl Context {
// another idle worker to try to steal work.
core.transition_from_searching(&self.worker);

// Attempt to notify a worker to park on IO driver if the current worker has
// previously obtained driver lock.
core.notify_for_driver_before_running(&self.worker);

self.assert_lifo_enabled_is_correct(&core);

// Measure the poll start time. Note that we may end up polling other
Expand Down Expand Up @@ -716,11 +726,14 @@ impl Context {
// Store `core` in context
*self.core.borrow_mut() = Some(core);

// Whether this park has obtained driver lock
let park_has_driver;

// Park thread
if let Some(timeout) = duration {
park.park_timeout(&self.worker.handle.driver, timeout);
park_has_driver = park.park_timeout(&self.worker.handle.driver, timeout);
} else {
park.park(&self.worker.handle.driver);
park_has_driver = park.park(&self.worker.handle.driver);
}

self.defer.wake();
Expand All @@ -735,6 +748,8 @@ impl Context {
self.worker.handle.notify_parked_local();
}

core.did_acquire_driver_lock = park_has_driver;

core
}

Expand Down Expand Up @@ -861,6 +876,13 @@ impl Core {
worker.handle.transition_worker_from_searching();
}

fn notify_for_driver_before_running(&mut self, worker: &Worker) {
if self.did_acquire_driver_lock {
worker.handle.notify_parked_local();
self.did_acquire_driver_lock = false;
}
Comment on lines +880 to +883
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did_acquire_driver_lock is checked every time in run_task() after transition_from_searching() so that notify_parked_local() would not consider the current core as searching.

}

fn has_tasks(&self) -> bool {
self.lifo_slot.is_some() || self.run_queue.has_tasks()
}
Expand Down