Skip to content

Commit

Permalink
metrics: Fix steal_count description and add steal_operations
Browse files Browse the repository at this point in the history
`steal_count` claimed to be tracking the amount of times a worker
successfully stole tasks, but instead tracked the number of tasks
stolen. Multiple tasks can be stolen at once.
Add the new `steal_operations` metric to be able to track how
many times tasks where stolen.
Tracking both metrics allows us to better gauge effects of
different stealing policies on the work stealing.
  • Loading branch information
jschwe committed Jan 27, 2023
1 parent 440ea2d commit fd4ad4c
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 6 deletions.
13 changes: 12 additions & 1 deletion tokio/src/runtime/metrics/batch.rs
Expand Up @@ -11,9 +11,12 @@ pub(crate) struct MetricsBatch {
/// Number of times the worker woke w/o doing work.
noop_count: u64,

/// Number of times stolen.
/// Number of tasks stolen.
steal_count: u64,

/// Number of times tasks where stolen.
steal_operations: u64,

/// Number of tasks that were polled by the worker.
poll_count: u64,

Expand All @@ -39,6 +42,7 @@ impl MetricsBatch {
park_count: 0,
noop_count: 0,
steal_count: 0,
steal_operations: 0,
poll_count: 0,
poll_count_on_last_park: 0,
local_schedule_count: 0,
Expand All @@ -52,6 +56,9 @@ impl MetricsBatch {
worker.park_count.store(self.park_count, Relaxed);
worker.noop_count.store(self.noop_count, Relaxed);
worker.steal_count.store(self.steal_count, Relaxed);
worker
.steal_operations
.store(self.steal_operations, Relaxed);
worker.poll_count.store(self.poll_count, Relaxed);

worker
Expand Down Expand Up @@ -98,6 +105,10 @@ cfg_rt_multi_thread! {
self.steal_count += by as u64;
}

pub(crate) fn incr_steal_operations(&mut self) {
self.steal_operations += 1;
}

pub(crate) fn incr_overflow_count(&mut self) {
self.overflow_count += 1;
}
Expand Down
1 change: 1 addition & 0 deletions tokio/src/runtime/metrics/mock.rs
Expand Up @@ -38,6 +38,7 @@ impl MetricsBatch {
cfg_rt_multi_thread! {
impl MetricsBatch {
pub(crate) fn incr_steal_count(&mut self, _by: u16) {}
pub(crate) fn incr_steal_operations(&mut self) {}
pub(crate) fn incr_overflow_count(&mut self) {}
}
}
55 changes: 51 additions & 4 deletions tokio/src/runtime/metrics/runtime.rs
Expand Up @@ -210,10 +210,57 @@ impl RuntimeMetrics {
.load(Relaxed)
}

/// Returns the number of tasks the given worker thread stole from
/// another worker thread.
///
/// This metric only applies to the **multi-threaded** runtime and will
/// always return `0` when using the current thread runtime.
///
/// The worker steal count starts at zero when the runtime is created and
/// increases by `N` each time the worker has processed its scheduled queue
/// and successfully steals `N` more pending tasks from another worker.
///
/// The counter is monotonically increasing. It is never decremented or
/// reset to zero.
///
/// # Arguments
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics
///
/// The method panics when `worker` represents an invalid worker, i.e. is
/// greater than or equal to `num_workers()`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.worker_steal_count(0);
/// println!("worker 0 has stolen {} tasks", n);
/// }
/// ```
pub fn worker_steal_count(&self, worker: usize) -> u64 {
self.handle
.inner
.worker_metrics(worker)
.steal_count
.load(Relaxed)
}

/// Returns the number of times the given worker thread stole tasks from
/// another worker thread.
///
/// This metric only applies to the **multi-threaded** runtime and will always return `0` when using the current thread runtime.
/// This metric only applies to the **multi-threaded** runtime and will
/// always return `0` when using the current thread runtime.
///
/// The worker steal count starts at zero when the runtime is created and
/// increases by one each time the worker has processed its scheduled queue
Expand Down Expand Up @@ -243,15 +290,15 @@ impl RuntimeMetrics {
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.worker_noop_count(0);
/// let n = metrics.worker_steal_operations(0);
/// println!("worker 0 has stolen tasks {} times", n);
/// }
/// ```
pub fn worker_steal_count(&self, worker: usize) -> u64 {
pub fn worker_steal_operations(&self, worker: usize) -> u64 {
self.handle
.inner
.worker_metrics(worker)
.steal_count
.steal_operations
.load(Relaxed)
}

Expand Down
6 changes: 5 additions & 1 deletion tokio/src/runtime/metrics/worker.rs
Expand Up @@ -17,9 +17,12 @@ pub(crate) struct WorkerMetrics {
/// Number of times the worker woke then parked again without doing work.
pub(crate) noop_count: AtomicU64,

/// Number of times the worker attempted to steal.
/// Number of tasks the worker stole.
pub(crate) steal_count: AtomicU64,

/// Number of times the worker stole
pub(crate) steal_operations: AtomicU64,

/// Number of tasks the worker polled.
pub(crate) poll_count: AtomicU64,

Expand All @@ -43,6 +46,7 @@ impl WorkerMetrics {
park_count: AtomicU64::new(0),
noop_count: AtomicU64::new(0),
steal_count: AtomicU64::new(0),
steal_operations: AtomicU64::new(0),
poll_count: AtomicU64::new(0),
overflow_count: AtomicU64::new(0),
busy_duration_total: AtomicU64::new(0),
Expand Down
1 change: 1 addition & 0 deletions tokio/src/runtime/scheduler/multi_thread/queue.rs
Expand Up @@ -353,6 +353,7 @@ impl<T> Steal<T> {
}

dst_metrics.incr_steal_count(n as u16);
dst_metrics.incr_steal_operations();

// We are returning a task here
n -= 1;
Expand Down

0 comments on commit fd4ad4c

Please sign in to comment.