Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: reduce no-op wakeups in the multi-threaded scheduler #4383

Merged
merged 4 commits into from Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 10 additions & 6 deletions tokio/src/runtime/thread_pool/idle.rs
Expand Up @@ -64,7 +64,7 @@ impl Idle {

// A worker should be woken up, atomically increment the number of
// searching workers as well as the number of unparked workers.
State::unpark_one(&self.state);
State::unpark_one(&self.state, 1);

// Get the worker to unpark
let ret = sleepers.pop();
Expand Down Expand Up @@ -111,19 +111,23 @@ impl Idle {

/// Unpark a specific worker. This happens if tasks are submitted from
/// within the worker's park routine.
pub(super) fn unpark_worker_by_id(&self, worker_id: usize) {
///
/// Returns `true` if the worker was parked before calling the method.
pub(super) fn unpark_worker_by_id(&self, worker_id: usize) -> bool {
let mut sleepers = self.sleepers.lock();

for index in 0..sleepers.len() {
if sleepers[index] == worker_id {
sleepers.swap_remove(index);

// Update the state accordingly while the lock is held.
State::unpark_one(&self.state);
State::unpark_one(&self.state, 0);

return;
return true;
}
}

false
}

/// Returns `true` if `worker_id` is contained in the sleep set.
Expand Down Expand Up @@ -151,8 +155,8 @@ impl State {
State(cell.load(ordering))
}

fn unpark_one(cell: &AtomicUsize) {
cell.fetch_add(1 | (1 << UNPARK_SHIFT), SeqCst);
fn unpark_one(cell: &AtomicUsize, num_searching: usize) {
cell.fetch_add(num_searching | (1 << UNPARK_SHIFT), SeqCst);
}

fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) {
Expand Down
5 changes: 2 additions & 3 deletions tokio/src/runtime/thread_pool/worker.rs
Expand Up @@ -511,7 +511,7 @@ impl Context {
core.park = Some(park);

// If there are tasks available to steal, notify a worker
carllerche marked this conversation as resolved.
Show resolved Hide resolved
if core.run_queue.is_stealable() {
if !core.is_searching && core.run_queue.is_stealable() {
self.worker.shared.notify_parked();
}

Expand Down Expand Up @@ -620,8 +620,7 @@ impl Core {
// If a task is in the lifo slot, then we must unpark regardless of
// being notified
if self.lifo_slot.is_some() {
worker.shared.idle.unpark_worker_by_id(worker.index);
self.is_searching = true;
self.is_searching = !worker.shared.idle.unpark_worker_by_id(worker.index);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the key bit. When the worker wakes, it only enters the "searching" state if it was unparked by another thread. This distinguishes unparks from other threads to signal to the worker that it should steal work vs. the I/O driver unparked the thread because events arrived.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be worth having a comment in the code explaining that? not a hard blocker though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return true;
}

Expand Down