Skip to content

Commit

Permalink
rt: reduce no-op wakes in multi-threaded runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
carllerche committed Jan 11, 2022
1 parent fbb40a6 commit 175d8b8
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 9 deletions.
8 changes: 8 additions & 0 deletions tokio/src/runtime/queue.rs
Expand Up @@ -90,6 +90,7 @@ pub(super) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
impl<T> Local<T> {
/// Returns true if the queue has entries that can be stealed.
pub(super) fn is_stealable(&self) -> bool {
// self.inner.len() > 4
!self.inner.is_empty()
}

Expand Down Expand Up @@ -469,6 +470,13 @@ impl<T> Drop for Local<T> {
}

impl<T> Inner<T> {
fn len(&self) -> u16 {
let (_, head) = unpack(self.head.load(Acquire));
let tail = self.tail.load(Acquire);

tail.wrapping_sub(head)
}

fn is_empty(&self) -> bool {
let (_, head) = unpack(self.head.load(Acquire));
let tail = self.tail.load(Acquire);
Expand Down
16 changes: 10 additions & 6 deletions tokio/src/runtime/thread_pool/idle.rs
Expand Up @@ -64,7 +64,7 @@ impl Idle {

// A worker should be woken up, atomically increment the number of
// searching workers as well as the number of unparked workers.
State::unpark_one(&self.state);
State::unpark_one(&self.state, 1);

// Get the worker to unpark
let ret = sleepers.pop();
Expand Down Expand Up @@ -111,19 +111,23 @@ impl Idle {

/// Unpark a specific worker. This happens if tasks are submitted from
/// within the worker's park routine.
pub(super) fn unpark_worker_by_id(&self, worker_id: usize) {
///
/// Returns `true` if the worker was parked before calling the method.
pub(super) fn unpark_worker_by_id(&self, worker_id: usize) -> bool {
let mut sleepers = self.sleepers.lock();

for index in 0..sleepers.len() {
if sleepers[index] == worker_id {
sleepers.swap_remove(index);

// Update the state accordingly while the lock is held.
State::unpark_one(&self.state);
State::unpark_one(&self.state, 0);

return;
return true;
}
}

false
}

/// Returns `true` if `worker_id` is contained in the sleep set.
Expand Down Expand Up @@ -151,8 +155,8 @@ impl State {
State(cell.load(ordering))
}

fn unpark_one(cell: &AtomicUsize) {
cell.fetch_add(1 | (1 << UNPARK_SHIFT), SeqCst);
fn unpark_one(cell: &AtomicUsize, num_searching: usize) {
cell.fetch_add(num_searching | (1 << UNPARK_SHIFT), SeqCst);
}

fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) {
Expand Down
5 changes: 2 additions & 3 deletions tokio/src/runtime/thread_pool/worker.rs
Expand Up @@ -512,7 +512,7 @@ impl Context {
core.park = Some(park);

// If there are tasks available to steal, notify a worker
if core.run_queue.is_stealable() {
if !core.is_searching && core.run_queue.is_stealable() {
self.worker.shared.notify_parked();
}

Expand Down Expand Up @@ -620,8 +620,7 @@ impl Core {
// If a task is in the lifo slot, then we must unpark regardless of
// being notified
if self.lifo_slot.is_some() {
worker.shared.idle.unpark_worker_by_id(worker.index);
self.is_searching = true;
self.is_searching = !worker.shared.idle.unpark_worker_by_id(worker.index);
return true;
}

Expand Down

0 comments on commit 175d8b8

Please sign in to comment.