diff --git a/tokio/src/runtime/queue.rs b/tokio/src/runtime/queue.rs index a88dffcaf5d..f75e672c5eb 100644 --- a/tokio/src/runtime/queue.rs +++ b/tokio/src/runtime/queue.rs @@ -3,7 +3,7 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::{AtomicU16, AtomicU32}; use crate::loom::sync::Arc; -use crate::runtime::stats::WorkerStatsBatcher; +use crate::runtime::stats::{WorkerStats, WorkerStatsBatcher}; use crate::runtime::task::{self, Inject}; use std::mem::MaybeUninit; @@ -102,7 +102,7 @@ impl Local { } /// Pushes a task to the back of the local queue, skipping the LIFO slot. - pub(super) fn push_back(&mut self, mut task: task::Notified, inject: &Inject) { + pub(super) fn push_back(&mut self, mut task: task::Notified, inject: &Inject, stats: &mut WorkerStatsBatcher) { let tail = loop { let head = self.inner.head.load(Acquire); let (steal, real) = unpack(head); @@ -117,11 +117,12 @@ impl Local { // Concurrently stealing, this will free up capacity, so only // push the task onto the inject queue inject.push(task); + stats.incr_overflow_count(1); return; } else { // Push the current task and half of the queue into the // inject queue. - match self.push_overflow(task, real, tail, inject) { + match self.push_overflow(task, real, tail, inject, stats) { Ok(_) => return, // Lost the race, try again Err(v) => { @@ -163,6 +164,7 @@ impl Local { head: u16, tail: u16, inject: &Inject, + stats: &mut WorkerStatsBatcher, ) -> Result<(), task::Notified> { /// How many elements are we taking from the local queue. /// @@ -246,6 +248,9 @@ impl Local { }; inject.push_batch(batch_iter.chain(std::iter::once(task))); + // Add 1 to factor in the task currently being scheduled. + stats.incr_overflow_count(NUM_TASKS_TAKEN + 1); + Ok(()) } @@ -300,7 +305,8 @@ impl Steal { pub(super) fn steal_into( &self, dst: &mut Local, - stats: &mut WorkerStatsBatcher, + dst_stats: &mut WorkerStatsBatcher, + src_stats: &WorkerStats, ) -> Option> { // Safety: the caller is the only thread that mutates `dst.tail` and // holds a mutable reference. @@ -320,13 +326,15 @@ impl Steal { // Steal the tasks into `dst`'s buffer. This does not yet expose the // tasks in `dst`. let mut n = self.steal_into2(dst, dst_tail); - stats.incr_steal_count(n); if n == 0 { // No tasks were stolen return None; } + dst_stats.incr_steal_count(n); + src_stats.incr_stolen_count(n); + // We are returning a task here n -= 1; diff --git a/tokio/src/runtime/stats/mock.rs b/tokio/src/runtime/stats/mock.rs index 5477f99ffe6..a51e7d298d2 100644 --- a/tokio/src/runtime/stats/mock.rs +++ b/tokio/src/runtime/stats/mock.rs @@ -10,24 +10,35 @@ impl RuntimeStats { /// Increment the number of tasks scheduled externally pub(crate) fn inc_remote_schedule_count(&self) { } + + pub(crate) fn worker(&self, _index: usize) -> &WorkerStats { + &WorkerStats {} + } } +pub(crate) struct WorkerStats {} + pub(crate) struct WorkerStatsBatcher {} +impl WorkerStats { + pub(crate) fn incr_stolen_count(&self, _n: u16) {} +} + impl WorkerStatsBatcher { pub(crate) fn new(_my_index: usize) -> Self { Self {} } pub(crate) fn submit(&mut self, _to: &RuntimeStats) {} - pub(crate) fn about_to_park(&mut self) {} pub(crate) fn returned_from_park(&mut self) {} - - #[cfg(feature = "rt-multi-thread")] - pub(crate) fn incr_steal_count(&mut self, _by: u16) {} - pub(crate) fn incr_poll_count(&mut self) {} - pub(crate) fn inc_local_schedule_count(&mut self) {} } + +cfg_rt_multi_thread! { + impl WorkerStatsBatcher { + pub(crate) fn incr_steal_count(&mut self, _by: u16) {} + pub(crate) fn incr_overflow_count(&mut self, _by: u16) {} + } +} diff --git a/tokio/src/runtime/stats/mod.rs b/tokio/src/runtime/stats/mod.rs index 355e400602d..cc3fd47897f 100644 --- a/tokio/src/runtime/stats/mod.rs +++ b/tokio/src/runtime/stats/mod.rs @@ -19,5 +19,5 @@ cfg_not_stats! { #[path = "mock.rs"] mod stats; - pub(crate) use self::stats::{RuntimeStats, WorkerStatsBatcher}; + pub(crate) use self::stats::{RuntimeStats, WorkerStats, WorkerStatsBatcher}; } diff --git a/tokio/src/runtime/stats/stats.rs b/tokio/src/runtime/stats/stats.rs index e80c0785a26..aecf51dde52 100644 --- a/tokio/src/runtime/stats/stats.rs +++ b/tokio/src/runtime/stats/stats.rs @@ -29,12 +29,29 @@ pub struct RuntimeStats { #[derive(Debug)] #[repr(align(128))] pub struct WorkerStats { + /// Number of times the worker parked. park_count: AtomicU64, + + /// Number of times the worker woke then parked again without doing work. noop_count: AtomicU64, + + /// Number of times the worker attempted to steal. steal_count: AtomicU64, + + /// Number of tasks the worker polled. poll_count: AtomicU64, + + /// Number of tasks stolen from the current worker. + stolen_count: AtomicU64, + + /// Amount of time the worker spent doing work vs. parking. busy_duration_total: AtomicU64, + + /// Number of tasks scheduled for execution on the worker's local queue. local_schedule_count: AtomicU64, + + /// Number of tasks moved from the local queue to the global queue to free space. + overflow_count: AtomicU64, } impl RuntimeStats { @@ -46,6 +63,8 @@ impl RuntimeStats { noop_count: AtomicU64::new(0), steal_count: AtomicU64::new(0), poll_count: AtomicU64::new(0), + stolen_count: AtomicU64::new(0), + overflow_count: AtomicU64::new(0), busy_duration_total: AtomicU64::new(0), local_schedule_count: AtomicU64::new(0), }); @@ -73,7 +92,11 @@ impl RuntimeStats { /// Increment the number of tasks scheduled externally pub(crate) fn inc_remote_schedule_count(&self) { self.remote_schedule_count.fetch_add(1, Relaxed); - } + } + + pub(crate) fn worker(&self, index: usize) -> &WorkerStats { + &self.workers[index] + } } impl WorkerStats { @@ -82,6 +105,9 @@ impl WorkerStats { self.park_count.load(Relaxed) } + /// Returns the number of times this worker unparked but performed no work. + /// + /// This is the false-positive wake count. pub fn noop_count(&self) -> u64 { self.noop_count.load(Relaxed) } @@ -92,6 +118,11 @@ impl WorkerStats { self.steal_count.load(Relaxed) } + /// Returns the number of tasks that were stolen from this worker. + pub fn stolen_count(&self) -> u64 { + self.stolen_count.load(Relaxed) + } + /// Returns the number of times this worker has polled a task. pub fn poll_count(&self) -> u64 { self.poll_count.load(Relaxed) @@ -106,9 +137,20 @@ impl WorkerStats { pub fn local_schedule_count(&self) -> u64 { self.local_schedule_count.load(Relaxed) } + + /// Returns the number of tasks moved from this worker's local queue to the + /// remote queue. + pub fn overflow_count(&self) -> u64 { + self.overflow_count.load(Relaxed) + } + + pub(crate) fn incr_stolen_count(&self, n: u16) { + self.stolen_count.fetch_add(n as _, Relaxed); + } } pub(crate) struct WorkerStatsBatcher { + /// Identifies the worker within the runtime. my_index: usize, /// Number of times the worker parked @@ -130,6 +172,10 @@ pub(crate) struct WorkerStatsBatcher { /// Number of tasks that were scheduled locally on this worker. local_schedule_count: u64, + /// Number of tasks moved to the global queue to make space in the local + /// queue + overflow_count: u64, + /// The total busy duration in nanoseconds. busy_duration_total: u64, last_resume_time: Instant, @@ -145,6 +191,7 @@ impl WorkerStatsBatcher { poll_count: 0, poll_count_on_last_park: 0, local_schedule_count: 0, + overflow_count: 0, busy_duration_total: 0, last_resume_time: Instant::now(), } @@ -162,6 +209,7 @@ impl WorkerStatsBatcher { .store(self.busy_duration_total, Relaxed); worker.local_schedule_count.store(self.local_schedule_count, Relaxed); + worker.overflow_count.store(self.overflow_count, Relaxed); } /// The worker is about to park. @@ -187,12 +235,19 @@ impl WorkerStatsBatcher { self.local_schedule_count += 1; } - #[cfg(feature = "rt-multi-thread")] - pub(crate) fn incr_steal_count(&mut self, by: u16) { - self.steal_count += u64::from(by); - } - pub(crate) fn incr_poll_count(&mut self) { self.poll_count += 1; } } + +cfg_rt_multi_thread! { + impl WorkerStatsBatcher { + pub(crate) fn incr_steal_count(&mut self, by: u16) { + self.steal_count += by as u64; + } + + pub(crate) fn incr_overflow_count(&mut self, by: u16) { + self.overflow_count += by as u64; + } + } +} diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index a4883c698b1..f4ddb654a15 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -445,7 +445,7 @@ impl Context { } else { // Not enough budget left to run the LIFO task, push it to // the back of the queue and return. - core.run_queue.push_back(task, self.worker.inject()); + core.run_queue.push_back(task, self.worker.inject(), &mut core.stats); return Ok(core); } } @@ -557,9 +557,10 @@ impl Core { } let target = &worker.shared.remotes[i]; + let target_stats = worker.shared.stats.worker(i); if let Some(task) = target .steal - .steal_into(&mut self.run_queue, &mut self.stats) + .steal_into(&mut self.run_queue, &mut self.stats, target_stats) { return Some(task); } @@ -731,7 +732,7 @@ impl Shared { // tasks to be executed. If **not** a yield, then there is more // flexibility and the task may go to the front of the queue. let should_notify = if is_yield { - core.run_queue.push_back(task, &self.inject); + core.run_queue.push_back(task, &self.inject, &mut core.stats); true } else { // Push to the LIFO slot @@ -739,7 +740,7 @@ impl Shared { let ret = prev.is_some(); if let Some(prev) = prev { - core.run_queue.push_back(prev, &self.inject); + core.run_queue.push_back(prev, &self.inject, &mut core.stats); } core.lifo_slot = Some(task);