From 4ba08acfd86e611fe30fe898d200a2afaa291609 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Fri, 6 May 2022 13:14:08 -0600 Subject: [PATCH] Revert "rt: reduce no-op wakeups in the multi-threaded scheduler (#4383)" This reverts commit 4eed411519783ef6f58cbf74f886f91142b5cfa6. --- tokio/src/runtime/thread_pool/idle.rs | 16 ++++++---------- tokio/src/runtime/thread_pool/worker.rs | 12 ++++-------- 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/tokio/src/runtime/thread_pool/idle.rs b/tokio/src/runtime/thread_pool/idle.rs index a57bf6a0b13..6b7ee1289ce 100644 --- a/tokio/src/runtime/thread_pool/idle.rs +++ b/tokio/src/runtime/thread_pool/idle.rs @@ -64,7 +64,7 @@ impl Idle { // A worker should be woken up, atomically increment the number of // searching workers as well as the number of unparked workers. - State::unpark_one(&self.state, 1); + State::unpark_one(&self.state); // Get the worker to unpark let ret = sleepers.pop(); @@ -111,9 +111,7 @@ impl Idle { /// Unpark a specific worker. This happens if tasks are submitted from /// within the worker's park routine. - /// - /// Returns `true` if the worker was parked before calling the method. - pub(super) fn unpark_worker_by_id(&self, worker_id: usize) -> bool { + pub(super) fn unpark_worker_by_id(&self, worker_id: usize) { let mut sleepers = self.sleepers.lock(); for index in 0..sleepers.len() { @@ -121,13 +119,11 @@ impl Idle { sleepers.swap_remove(index); // Update the state accordingly while the lock is held. - State::unpark_one(&self.state, 0); + State::unpark_one(&self.state); - return true; + return; } } - - false } /// Returns `true` if `worker_id` is contained in the sleep set. @@ -155,8 +151,8 @@ impl State { State(cell.load(ordering)) } - fn unpark_one(cell: &AtomicUsize, num_searching: usize) { - cell.fetch_add(num_searching | (1 << UNPARK_SHIFT), SeqCst); + fn unpark_one(cell: &AtomicUsize) { + cell.fetch_add(1 | (1 << UNPARK_SHIFT), SeqCst); } fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) { diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index 7e4989701e5..60706cad356 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -516,9 +516,8 @@ impl Context { // Place `park` back in `core` core.park = Some(park); - // If there are tasks available to steal, but this worker is not - // looking for tasks to steal, notify another worker. - if !core.is_searching && core.run_queue.is_stealable() { + // If there are tasks available to steal, notify a worker + if core.run_queue.is_stealable() { self.worker.shared.notify_parked(); } @@ -625,11 +624,8 @@ impl Core { // If a task is in the lifo slot, then we must unpark regardless of // being notified if self.lifo_slot.is_some() { - // When a worker wakes, it should only transition to the "searching" - // state when the wake originates from another worker *or* a new task - // is pushed. We do *not* want the worker to transition to "searching" - // when it wakes when the I/O driver receives new events. - self.is_searching = !worker.shared.idle.unpark_worker_by_id(worker.index); + worker.shared.idle.unpark_worker_by_id(worker.index); + self.is_searching = true; return true; }