Skip to content

Commit

Permalink
instrument queue stolen count and overflow count
Browse files Browse the repository at this point in the history
  • Loading branch information
carllerche committed Jan 11, 2022
1 parent 60c7152 commit fbb40a6
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 22 deletions.
18 changes: 13 additions & 5 deletions tokio/src/runtime/queue.rs
Expand Up @@ -3,7 +3,7 @@
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{AtomicU16, AtomicU32};
use crate::loom::sync::Arc;
use crate::runtime::stats::WorkerStatsBatcher;
use crate::runtime::stats::{WorkerStats, WorkerStatsBatcher};
use crate::runtime::task::{self, Inject};

use std::mem::MaybeUninit;
Expand Down Expand Up @@ -102,7 +102,7 @@ impl<T> Local<T> {
}

/// Pushes a task to the back of the local queue, skipping the LIFO slot.
pub(super) fn push_back(&mut self, mut task: task::Notified<T>, inject: &Inject<T>) {
pub(super) fn push_back(&mut self, mut task: task::Notified<T>, inject: &Inject<T>, stats: &mut WorkerStatsBatcher) {
let tail = loop {
let head = self.inner.head.load(Acquire);
let (steal, real) = unpack(head);
Expand All @@ -117,11 +117,12 @@ impl<T> Local<T> {
// Concurrently stealing, this will free up capacity, so only
// push the task onto the inject queue
inject.push(task);
stats.incr_overflow_count(1);
return;
} else {
// Push the current task and half of the queue into the
// inject queue.
match self.push_overflow(task, real, tail, inject) {
match self.push_overflow(task, real, tail, inject, stats) {
Ok(_) => return,
// Lost the race, try again
Err(v) => {
Expand Down Expand Up @@ -163,6 +164,7 @@ impl<T> Local<T> {
head: u16,
tail: u16,
inject: &Inject<T>,
stats: &mut WorkerStatsBatcher,
) -> Result<(), task::Notified<T>> {
/// How many elements are we taking from the local queue.
///
Expand Down Expand Up @@ -246,6 +248,9 @@ impl<T> Local<T> {
};
inject.push_batch(batch_iter.chain(std::iter::once(task)));

// Add 1 to factor in the task currently being scheduled.
stats.incr_overflow_count(NUM_TASKS_TAKEN + 1);

Ok(())
}

Expand Down Expand Up @@ -300,7 +305,8 @@ impl<T> Steal<T> {
pub(super) fn steal_into(
&self,
dst: &mut Local<T>,
stats: &mut WorkerStatsBatcher,
dst_stats: &mut WorkerStatsBatcher,
src_stats: &WorkerStats,
) -> Option<task::Notified<T>> {
// Safety: the caller is the only thread that mutates `dst.tail` and
// holds a mutable reference.
Expand All @@ -320,13 +326,15 @@ impl<T> Steal<T> {
// Steal the tasks into `dst`'s buffer. This does not yet expose the
// tasks in `dst`.
let mut n = self.steal_into2(dst, dst_tail);
stats.incr_steal_count(n);

if n == 0 {
// No tasks were stolen
return None;
}

dst_stats.incr_steal_count(n);
src_stats.incr_stolen_count(n);

// We are returning a task here
n -= 1;

Expand Down
23 changes: 17 additions & 6 deletions tokio/src/runtime/stats/mock.rs
Expand Up @@ -10,24 +10,35 @@ impl RuntimeStats {
/// Increment the number of tasks scheduled externally
pub(crate) fn inc_remote_schedule_count(&self) {
}

pub(crate) fn worker(&self, _index: usize) -> &WorkerStats {
&WorkerStats {}
}
}

pub(crate) struct WorkerStats {}

pub(crate) struct WorkerStatsBatcher {}

impl WorkerStats {
pub(crate) fn incr_stolen_count(&self, _n: u16) {}
}

impl WorkerStatsBatcher {
pub(crate) fn new(_my_index: usize) -> Self {
Self {}
}

pub(crate) fn submit(&mut self, _to: &RuntimeStats) {}

pub(crate) fn about_to_park(&mut self) {}
pub(crate) fn returned_from_park(&mut self) {}

#[cfg(feature = "rt-multi-thread")]
pub(crate) fn incr_steal_count(&mut self, _by: u16) {}

pub(crate) fn incr_poll_count(&mut self) {}

pub(crate) fn inc_local_schedule_count(&mut self) {}
}

cfg_rt_multi_thread! {
impl WorkerStatsBatcher {
pub(crate) fn incr_steal_count(&mut self, _by: u16) {}
pub(crate) fn incr_overflow_count(&mut self, _by: u16) {}
}
}
2 changes: 1 addition & 1 deletion tokio/src/runtime/stats/mod.rs
Expand Up @@ -19,5 +19,5 @@ cfg_not_stats! {
#[path = "mock.rs"]
mod stats;

pub(crate) use self::stats::{RuntimeStats, WorkerStatsBatcher};
pub(crate) use self::stats::{RuntimeStats, WorkerStats, WorkerStatsBatcher};
}
67 changes: 61 additions & 6 deletions tokio/src/runtime/stats/stats.rs
Expand Up @@ -29,12 +29,29 @@ pub struct RuntimeStats {
#[derive(Debug)]
#[repr(align(128))]
pub struct WorkerStats {
/// Number of times the worker parked.
park_count: AtomicU64,

/// Number of times the worker woke then parked again without doing work.
noop_count: AtomicU64,

/// Number of times the worker attempted to steal.
steal_count: AtomicU64,

/// Number of tasks the worker polled.
poll_count: AtomicU64,

/// Number of tasks stolen from the current worker.
stolen_count: AtomicU64,

/// Amount of time the worker spent doing work vs. parking.
busy_duration_total: AtomicU64,

/// Number of tasks scheduled for execution on the worker's local queue.
local_schedule_count: AtomicU64,

/// Number of tasks moved from the local queue to the global queue to free space.
overflow_count: AtomicU64,
}

impl RuntimeStats {
Expand All @@ -46,6 +63,8 @@ impl RuntimeStats {
noop_count: AtomicU64::new(0),
steal_count: AtomicU64::new(0),
poll_count: AtomicU64::new(0),
stolen_count: AtomicU64::new(0),
overflow_count: AtomicU64::new(0),
busy_duration_total: AtomicU64::new(0),
local_schedule_count: AtomicU64::new(0),
});
Expand Down Expand Up @@ -73,7 +92,11 @@ impl RuntimeStats {
/// Increment the number of tasks scheduled externally
pub(crate) fn inc_remote_schedule_count(&self) {
self.remote_schedule_count.fetch_add(1, Relaxed);
}
}

pub(crate) fn worker(&self, index: usize) -> &WorkerStats {
&self.workers[index]
}
}

impl WorkerStats {
Expand All @@ -82,6 +105,9 @@ impl WorkerStats {
self.park_count.load(Relaxed)
}

/// Returns the number of times this worker unparked but performed no work.
///
/// This is the false-positive wake count.
pub fn noop_count(&self) -> u64 {
self.noop_count.load(Relaxed)
}
Expand All @@ -92,6 +118,11 @@ impl WorkerStats {
self.steal_count.load(Relaxed)
}

/// Returns the number of tasks that were stolen from this worker.
pub fn stolen_count(&self) -> u64 {
self.stolen_count.load(Relaxed)
}

/// Returns the number of times this worker has polled a task.
pub fn poll_count(&self) -> u64 {
self.poll_count.load(Relaxed)
Expand All @@ -106,9 +137,20 @@ impl WorkerStats {
pub fn local_schedule_count(&self) -> u64 {
self.local_schedule_count.load(Relaxed)
}

/// Returns the number of tasks moved from this worker's local queue to the
/// remote queue.
pub fn overflow_count(&self) -> u64 {
self.overflow_count.load(Relaxed)
}

pub(crate) fn incr_stolen_count(&self, n: u16) {
self.stolen_count.fetch_add(n as _, Relaxed);
}
}

pub(crate) struct WorkerStatsBatcher {
/// Identifies the worker within the runtime.
my_index: usize,

/// Number of times the worker parked
Expand All @@ -130,6 +172,10 @@ pub(crate) struct WorkerStatsBatcher {
/// Number of tasks that were scheduled locally on this worker.
local_schedule_count: u64,

/// Number of tasks moved to the global queue to make space in the local
/// queue
overflow_count: u64,

/// The total busy duration in nanoseconds.
busy_duration_total: u64,
last_resume_time: Instant,
Expand All @@ -145,6 +191,7 @@ impl WorkerStatsBatcher {
poll_count: 0,
poll_count_on_last_park: 0,
local_schedule_count: 0,
overflow_count: 0,
busy_duration_total: 0,
last_resume_time: Instant::now(),
}
Expand All @@ -162,6 +209,7 @@ impl WorkerStatsBatcher {
.store(self.busy_duration_total, Relaxed);

worker.local_schedule_count.store(self.local_schedule_count, Relaxed);
worker.overflow_count.store(self.overflow_count, Relaxed);
}

/// The worker is about to park.
Expand All @@ -187,12 +235,19 @@ impl WorkerStatsBatcher {
self.local_schedule_count += 1;
}

#[cfg(feature = "rt-multi-thread")]
pub(crate) fn incr_steal_count(&mut self, by: u16) {
self.steal_count += u64::from(by);
}

pub(crate) fn incr_poll_count(&mut self) {
self.poll_count += 1;
}
}

cfg_rt_multi_thread! {
impl WorkerStatsBatcher {
pub(crate) fn incr_steal_count(&mut self, by: u16) {
self.steal_count += by as u64;
}

pub(crate) fn incr_overflow_count(&mut self, by: u16) {
self.overflow_count += by as u64;
}
}
}
9 changes: 5 additions & 4 deletions tokio/src/runtime/thread_pool/worker.rs
Expand Up @@ -445,7 +445,7 @@ impl Context {
} else {
// Not enough budget left to run the LIFO task, push it to
// the back of the queue and return.
core.run_queue.push_back(task, self.worker.inject());
core.run_queue.push_back(task, self.worker.inject(), &mut core.stats);
return Ok(core);
}
}
Expand Down Expand Up @@ -557,9 +557,10 @@ impl Core {
}

let target = &worker.shared.remotes[i];
let target_stats = worker.shared.stats.worker(i);
if let Some(task) = target
.steal
.steal_into(&mut self.run_queue, &mut self.stats)
.steal_into(&mut self.run_queue, &mut self.stats, target_stats)
{
return Some(task);
}
Expand Down Expand Up @@ -731,15 +732,15 @@ impl Shared {
// tasks to be executed. If **not** a yield, then there is more
// flexibility and the task may go to the front of the queue.
let should_notify = if is_yield {
core.run_queue.push_back(task, &self.inject);
core.run_queue.push_back(task, &self.inject, &mut core.stats);
true
} else {
// Push to the LIFO slot
let prev = core.lifo_slot.take();
let ret = prev.is_some();

if let Some(prev) = prev {
core.run_queue.push_back(prev, &self.inject);
core.run_queue.push_back(prev, &self.inject, &mut core.stats);
}

core.lifo_slot = Some(task);
Expand Down

0 comments on commit fbb40a6

Please sign in to comment.