diff --git a/tokio/src/runtime/queue.rs b/tokio/src/runtime/queue.rs index f75e672c5eb..93225a11e64 100644 --- a/tokio/src/runtime/queue.rs +++ b/tokio/src/runtime/queue.rs @@ -90,6 +90,7 @@ pub(super) fn local() -> (Steal, Local) { impl Local { /// Returns true if the queue has entries that can be stealed. pub(super) fn is_stealable(&self) -> bool { + // self.inner.len() > 4 !self.inner.is_empty() } @@ -469,6 +470,13 @@ impl Drop for Local { } impl Inner { + fn len(&self) -> u16 { + let (_, head) = unpack(self.head.load(Acquire)); + let tail = self.tail.load(Acquire); + + tail.wrapping_sub(head) + } + fn is_empty(&self) -> bool { let (_, head) = unpack(self.head.load(Acquire)); let tail = self.tail.load(Acquire); diff --git a/tokio/src/runtime/thread_pool/idle.rs b/tokio/src/runtime/thread_pool/idle.rs index 6b7ee1289ce..a57bf6a0b13 100644 --- a/tokio/src/runtime/thread_pool/idle.rs +++ b/tokio/src/runtime/thread_pool/idle.rs @@ -64,7 +64,7 @@ impl Idle { // A worker should be woken up, atomically increment the number of // searching workers as well as the number of unparked workers. - State::unpark_one(&self.state); + State::unpark_one(&self.state, 1); // Get the worker to unpark let ret = sleepers.pop(); @@ -111,7 +111,9 @@ impl Idle { /// Unpark a specific worker. This happens if tasks are submitted from /// within the worker's park routine. - pub(super) fn unpark_worker_by_id(&self, worker_id: usize) { + /// + /// Returns `true` if the worker was parked before calling the method. + pub(super) fn unpark_worker_by_id(&self, worker_id: usize) -> bool { let mut sleepers = self.sleepers.lock(); for index in 0..sleepers.len() { @@ -119,11 +121,13 @@ impl Idle { sleepers.swap_remove(index); // Update the state accordingly while the lock is held. - State::unpark_one(&self.state); + State::unpark_one(&self.state, 0); - return; + return true; } } + + false } /// Returns `true` if `worker_id` is contained in the sleep set. @@ -151,8 +155,8 @@ impl State { State(cell.load(ordering)) } - fn unpark_one(cell: &AtomicUsize) { - cell.fetch_add(1 | (1 << UNPARK_SHIFT), SeqCst); + fn unpark_one(cell: &AtomicUsize, num_searching: usize) { + cell.fetch_add(num_searching | (1 << UNPARK_SHIFT), SeqCst); } fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) { diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index f4ddb654a15..a6ec55c7443 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -512,7 +512,7 @@ impl Context { core.park = Some(park); // If there are tasks available to steal, notify a worker - if core.run_queue.is_stealable() { + if !core.is_searching && core.run_queue.is_stealable() { self.worker.shared.notify_parked(); } @@ -620,8 +620,7 @@ impl Core { // If a task is in the lifo slot, then we must unpark regardless of // being notified if self.lifo_slot.is_some() { - worker.shared.idle.unpark_worker_by_id(worker.index); - self.is_searching = true; + self.is_searching = !worker.shared.idle.unpark_worker_by_id(worker.index); return true; }