Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: reduce no-op wakeups in the multi-threaded scheduler #4383

Merged
merged 4 commits into from Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 10 additions & 6 deletions tokio/src/runtime/thread_pool/idle.rs
Expand Up @@ -64,7 +64,7 @@ impl Idle {

// A worker should be woken up, atomically increment the number of
// searching workers as well as the number of unparked workers.
State::unpark_one(&self.state);
State::unpark_one(&self.state, 1);

// Get the worker to unpark
let ret = sleepers.pop();
Expand Down Expand Up @@ -111,19 +111,23 @@ impl Idle {

/// Unpark a specific worker. This happens if tasks are submitted from
/// within the worker's park routine.
pub(super) fn unpark_worker_by_id(&self, worker_id: usize) {
///
/// Returns `true` if the worker was parked before calling the method.
pub(super) fn unpark_worker_by_id(&self, worker_id: usize) -> bool {
let mut sleepers = self.sleepers.lock();

for index in 0..sleepers.len() {
if sleepers[index] == worker_id {
sleepers.swap_remove(index);

// Update the state accordingly while the lock is held.
State::unpark_one(&self.state);
State::unpark_one(&self.state, 0);

return;
return true;
}
}

false
}

/// Returns `true` if `worker_id` is contained in the sleep set.
Expand Down Expand Up @@ -151,8 +155,8 @@ impl State {
State(cell.load(ordering))
}

fn unpark_one(cell: &AtomicUsize) {
cell.fetch_add(1 | (1 << UNPARK_SHIFT), SeqCst);
fn unpark_one(cell: &AtomicUsize, num_searching: usize) {
cell.fetch_add(num_searching | (1 << UNPARK_SHIFT), SeqCst);
}

fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) {
Expand Down
12 changes: 8 additions & 4 deletions tokio/src/runtime/thread_pool/worker.rs
Expand Up @@ -511,8 +511,9 @@ impl Context {
// Place `park` back in `core`
core.park = Some(park);

// If there are tasks available to steal, notify a worker
if core.run_queue.is_stealable() {
// If there are tasks available to steal, but this worker is not
// looking for tasks to steal, notify another worker.
if !core.is_searching && core.run_queue.is_stealable() {
self.worker.shared.notify_parked();
}

Expand Down Expand Up @@ -621,8 +622,11 @@ impl Core {
// If a task is in the lifo slot, then we must unpark regardless of
// being notified
if self.lifo_slot.is_some() {
worker.shared.idle.unpark_worker_by_id(worker.index);
self.is_searching = true;
// When a worker wakes, it should only transition to the "searching"
// state when the wake originates from another worker *or* a new task
// is pushed. We do *not* want the worker to transition to "searching"
// when it wakes when the I/O driver receives new events.
self.is_searching = !worker.shared.idle.unpark_worker_by_id(worker.index);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the key bit. When the worker wakes, it only enters the "searching" state if it was unparked by another thread. This distinguishes unparks from other threads to signal to the worker that it should steal work vs. the I/O driver unparked the thread because events arrived.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be worth having a comment in the code explaining that? not a hard blocker though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return true;
}

Expand Down