Skip to content

Commit

Permalink
rt: add steal_operations metric (#37)
Browse files Browse the repository at this point in the history
Follow-up to #35.

The new metric is available starting with tokio v1.25.0.
  • Loading branch information
jschwe committed Mar 6, 2023
1 parent 234690c commit 904c538
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 2 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -31,7 +31,7 @@ futures = "0.3.21"
num_cpus = "1.13.1"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"
tokio = { version = "1.15.0", features = ["full", "rt", "time", "macros", "test-util"] }
tokio = { version = "1.26.0", features = ["full", "rt", "time", "macros", "test-util"] }

[[example]]
name = "runtime"
Expand Down
9 changes: 9 additions & 0 deletions README.md
Expand Up @@ -182,6 +182,12 @@ tokio::spawn(do_work());
The maximum number of tasks any worker thread stole from another worker thread.
- **[`min_steal_count`]**
The minimum number of tasks any worker thread stole from another worker thread.
- **[`total_steal_operations`]**
The number of times worker threads stole tasks from another worker thread.
- **[`max_steal_operations`]**
The maximum number of times any worker thread stole tasks from another worker thread.
- **[`min_steal_operations`]**
The minimum number of times any worker thread stole tasks from another worker thread.
- **[`num_remote_schedules`]**
The number of tasks scheduled from outside of the runtime.
- **[`total_local_schedule_count`]**
Expand Down Expand Up @@ -233,6 +239,9 @@ tokio::spawn(do_work());
[`total_steal_count`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.total_steal_count
[`max_steal_count`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.max_steal_count
[`min_steal_count`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.min_steal_count
[`total_steal_operations`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.total_steal_operations
[`max_steal_operations`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.max_steal_operations
[`min_steal_operations`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.min_steal_operations
[`num_remote_schedules`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.num_remote_schedules
[`total_local_schedule_count`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.total_local_schedule_count
[`max_local_schedule_count`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.max_local_schedule_count
Expand Down
99 changes: 98 additions & 1 deletion src/runtime.rs
Expand Up @@ -256,7 +256,7 @@ pub struct RuntimeMetrics {

/// The number of tasks worker threads stole from another worker thread.
///
/// The worker steal count starts increases by the amount of stolen tasks each time the worker
/// The worker steal count increases by the amount of stolen tasks each time the worker
/// has processed its scheduled queue and successfully steals more pending tasks from another
/// worker.
///
Expand Down Expand Up @@ -344,6 +344,95 @@ pub struct RuntimeMetrics {
/// - [`RuntimeMetrics::max_steal_count`]
pub min_steal_count: u64,

/// The number of times worker threads stole tasks from another worker thread.
///
/// The worker steal operations increases by one each time the worker has processed its
/// scheduled queue and successfully steals more pending tasks from another worker.
///
/// This metric only applies to the **multi-threaded** runtime and will always return `0` when
/// using the current thread runtime.
///
/// ##### Definition
/// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
/// for all worker threads.
///
/// ##### See also
/// - [`RuntimeMetrics::min_steal_operations`]
/// - [`RuntimeMetrics::max_steal_operations`]
///
/// ##### Examples
/// In the below example, a blocking channel is used to backup one worker thread:
/// ```
/// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
/// async fn main() {
/// let handle = tokio::runtime::Handle::current();
/// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
/// let mut intervals = monitor.intervals();
/// let mut next_interval = || intervals.next().unwrap();
///
/// let interval = next_interval(); // end of first sampling interval
/// assert_eq!(interval.total_steal_operations, 0);
/// assert_eq!(interval.min_steal_operations, 0);
/// assert_eq!(interval.max_steal_operations, 0);
///
/// // induce a steal
/// async {
/// let (tx, rx) = std::sync::mpsc::channel();
/// // Move to the runtime.
/// tokio::spawn(async move {
/// // Spawn the task that sends to the channel
/// tokio::spawn(async move {
/// tx.send(()).unwrap();
/// });
/// // Spawn a task that bumps the previous task out of the "next
/// // scheduled" slot.
/// tokio::spawn(async {});
/// // Blocking receive on the channe.
/// rx.recv().unwrap();
/// flush_metrics().await;
/// }).await.unwrap();
/// flush_metrics().await;
/// }.await;
///
/// let interval = { flush_metrics().await; next_interval() }; // end of interval 2
/// assert_eq!(interval.total_steal_operations, 1);
/// assert_eq!(interval.min_steal_operations, 0);
/// assert_eq!(interval.max_steal_operations, 1);
///
/// let interval = { flush_metrics().await; next_interval() }; // end of interval 3
/// assert_eq!(interval.total_steal_operations, 0);
/// assert_eq!(interval.min_steal_operations, 0);
/// assert_eq!(interval.max_steal_operations, 0);
/// }
///
/// async fn flush_metrics() {
/// let _ = tokio::time::sleep(std::time::Duration::ZERO).await;
/// }
/// ```
pub total_steal_operations: u64,

/// The maximum number of times any worker thread stole tasks from another worker thread.
///
/// ##### Definition
/// This metric is derived from the maximum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
/// across all worker threads.
///
/// ##### See also
/// - [`RuntimeMetrics::total_steal_operations`]
/// - [`RuntimeMetrics::min_steal_operations`]
pub max_steal_operations: u64,

/// The minimum number of times any worker thread stole tasks from another worker thread.
///
/// ##### Definition
/// This metric is derived from the minimum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
/// across all worker threads.
///
/// ##### See also
/// - [`RuntimeMetrics::total_steal_operations`]
/// - [`RuntimeMetrics::max_steal_operations`]
pub min_steal_operations: u64,

/// The number of tasks scheduled from **outside** of the runtime.
///
/// The remote schedule count increases by one each time a task is woken from **outside** of
Expand Down Expand Up @@ -959,6 +1048,7 @@ struct Worker {
total_park_count: u64,
total_noop_count: u64,
total_steal_count: u64,
total_steal_operations: u64,
total_local_schedule_count: u64,
total_overflow_count: u64,
total_polls_count: u64,
Expand Down Expand Up @@ -1106,6 +1196,7 @@ impl Worker {
total_park_count: rt.worker_park_count(worker),
total_noop_count: rt.worker_noop_count(worker),
total_steal_count: rt.worker_steal_count(worker),
total_steal_operations: rt.worker_steal_operations(worker),
total_local_schedule_count: rt.worker_local_schedule_count(worker),
total_overflow_count: rt.worker_overflow_count(worker),
total_polls_count: rt.worker_poll_count(worker),
Expand Down Expand Up @@ -1150,6 +1241,12 @@ impl Worker {
min_steal_count,
worker_steal_count
);
metric!(
total_steal_operations,
max_steal_operations,
min_steal_operations,
worker_steal_operations
);
metric!(
total_local_schedule_count,
max_local_schedule_count,
Expand Down

0 comments on commit 904c538

Please sign in to comment.