From 49192be3de9fbac56b3db026a438640f0b27f0e5 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 10 Jan 2022 16:15:54 -0800 Subject: [PATCH] rt: reduce no-op wakes in multi-threaded runtime --- tokio/src/runtime/thread_pool/idle.rs | 16 ++++++++++------ tokio/src/runtime/thread_pool/worker.rs | 5 ++--- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/tokio/src/runtime/thread_pool/idle.rs b/tokio/src/runtime/thread_pool/idle.rs index 6b7ee1289ce..a57bf6a0b13 100644 --- a/tokio/src/runtime/thread_pool/idle.rs +++ b/tokio/src/runtime/thread_pool/idle.rs @@ -64,7 +64,7 @@ impl Idle { // A worker should be woken up, atomically increment the number of // searching workers as well as the number of unparked workers. - State::unpark_one(&self.state); + State::unpark_one(&self.state, 1); // Get the worker to unpark let ret = sleepers.pop(); @@ -111,7 +111,9 @@ impl Idle { /// Unpark a specific worker. This happens if tasks are submitted from /// within the worker's park routine. - pub(super) fn unpark_worker_by_id(&self, worker_id: usize) { + /// + /// Returns `true` if the worker was parked before calling the method. + pub(super) fn unpark_worker_by_id(&self, worker_id: usize) -> bool { let mut sleepers = self.sleepers.lock(); for index in 0..sleepers.len() { @@ -119,11 +121,13 @@ impl Idle { sleepers.swap_remove(index); // Update the state accordingly while the lock is held. - State::unpark_one(&self.state); + State::unpark_one(&self.state, 0); - return; + return true; } } + + false } /// Returns `true` if `worker_id` is contained in the sleep set. @@ -151,8 +155,8 @@ impl State { State(cell.load(ordering)) } - fn unpark_one(cell: &AtomicUsize) { - cell.fetch_add(1 | (1 << UNPARK_SHIFT), SeqCst); + fn unpark_one(cell: &AtomicUsize, num_searching: usize) { + cell.fetch_add(num_searching | (1 << UNPARK_SHIFT), SeqCst); } fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) { diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index ae8efe6724f..b091fa93d50 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -511,7 +511,7 @@ impl Context { core.park = Some(park); // If there are tasks available to steal, notify a worker - if core.run_queue.is_stealable() { + if !core.is_searching && core.run_queue.is_stealable() { self.worker.shared.notify_parked(); } @@ -620,8 +620,7 @@ impl Core { // If a task is in the lifo slot, then we must unpark regardless of // being notified if self.lifo_slot.is_some() { - worker.shared.idle.unpark_worker_by_id(worker.index); - self.is_searching = true; + self.is_searching = !worker.shared.idle.unpark_worker_by_id(worker.index); return true; }