From 904c538f70f87c7703ffe3817b6cc63842c2cfcc Mon Sep 17 00:00:00 2001 From: Jonathan Schwender <55576758+jschwe@users.noreply.github.com> Date: Mon, 6 Mar 2023 17:24:33 +0100 Subject: [PATCH] rt: add steal_operations metric (#37) Follow-up to #35. The new metric is available starting with tokio v1.25.0. --- Cargo.toml | 2 +- README.md | 9 +++++ src/runtime.rs | 99 +++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 108 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a5ca53e..82d2acc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,7 @@ futures = "0.3.21" num_cpus = "1.13.1" serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.79" -tokio = { version = "1.15.0", features = ["full", "rt", "time", "macros", "test-util"] } +tokio = { version = "1.26.0", features = ["full", "rt", "time", "macros", "test-util"] } [[example]] name = "runtime" diff --git a/README.md b/README.md index 991326c..7a46b06 100644 --- a/README.md +++ b/README.md @@ -182,6 +182,12 @@ tokio::spawn(do_work()); The maximum number of tasks any worker thread stole from another worker thread. - **[`min_steal_count`]** The minimum number of tasks any worker thread stole from another worker thread. +- **[`total_steal_operations`]** + The number of times worker threads stole tasks from another worker thread. +- **[`max_steal_operations`]** + The maximum number of times any worker thread stole tasks from another worker thread. +- **[`min_steal_operations`]** + The minimum number of times any worker thread stole tasks from another worker thread. - **[`num_remote_schedules`]** The number of tasks scheduled from outside of the runtime. - **[`total_local_schedule_count`]** @@ -233,6 +239,9 @@ tokio::spawn(do_work()); [`total_steal_count`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.total_steal_count [`max_steal_count`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.max_steal_count [`min_steal_count`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.min_steal_count +[`total_steal_operations`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.total_steal_operations +[`max_steal_operations`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.max_steal_operations +[`min_steal_operations`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.min_steal_operations [`num_remote_schedules`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.num_remote_schedules [`total_local_schedule_count`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.total_local_schedule_count [`max_local_schedule_count`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.max_local_schedule_count diff --git a/src/runtime.rs b/src/runtime.rs index d9bcef3..094a053 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -256,7 +256,7 @@ pub struct RuntimeMetrics { /// The number of tasks worker threads stole from another worker thread. /// - /// The worker steal count starts increases by the amount of stolen tasks each time the worker + /// The worker steal count increases by the amount of stolen tasks each time the worker /// has processed its scheduled queue and successfully steals more pending tasks from another /// worker. /// @@ -344,6 +344,95 @@ pub struct RuntimeMetrics { /// - [`RuntimeMetrics::max_steal_count`] pub min_steal_count: u64, + /// The number of times worker threads stole tasks from another worker thread. + /// + /// The worker steal operations increases by one each time the worker has processed its + /// scheduled queue and successfully steals more pending tasks from another worker. + /// + /// This metric only applies to the **multi-threaded** runtime and will always return `0` when + /// using the current thread runtime. + /// + /// ##### Definition + /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`] + /// for all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::min_steal_operations`] + /// - [`RuntimeMetrics::max_steal_operations`] + /// + /// ##### Examples + /// In the below example, a blocking channel is used to backup one worker thread: + /// ``` + /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] + /// async fn main() { + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// let interval = next_interval(); // end of first sampling interval + /// assert_eq!(interval.total_steal_operations, 0); + /// assert_eq!(interval.min_steal_operations, 0); + /// assert_eq!(interval.max_steal_operations, 0); + /// + /// // induce a steal + /// async { + /// let (tx, rx) = std::sync::mpsc::channel(); + /// // Move to the runtime. + /// tokio::spawn(async move { + /// // Spawn the task that sends to the channel + /// tokio::spawn(async move { + /// tx.send(()).unwrap(); + /// }); + /// // Spawn a task that bumps the previous task out of the "next + /// // scheduled" slot. + /// tokio::spawn(async {}); + /// // Blocking receive on the channe. + /// rx.recv().unwrap(); + /// flush_metrics().await; + /// }).await.unwrap(); + /// flush_metrics().await; + /// }.await; + /// + /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2 + /// assert_eq!(interval.total_steal_operations, 1); + /// assert_eq!(interval.min_steal_operations, 0); + /// assert_eq!(interval.max_steal_operations, 1); + /// + /// let interval = { flush_metrics().await; next_interval() }; // end of interval 3 + /// assert_eq!(interval.total_steal_operations, 0); + /// assert_eq!(interval.min_steal_operations, 0); + /// assert_eq!(interval.max_steal_operations, 0); + /// } + /// + /// async fn flush_metrics() { + /// let _ = tokio::time::sleep(std::time::Duration::ZERO).await; + /// } + /// ``` + pub total_steal_operations: u64, + + /// The maximum number of times any worker thread stole tasks from another worker thread. + /// + /// ##### Definition + /// This metric is derived from the maximum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`] + /// across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::total_steal_operations`] + /// - [`RuntimeMetrics::min_steal_operations`] + pub max_steal_operations: u64, + + /// The minimum number of times any worker thread stole tasks from another worker thread. + /// + /// ##### Definition + /// This metric is derived from the minimum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`] + /// across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::total_steal_operations`] + /// - [`RuntimeMetrics::max_steal_operations`] + pub min_steal_operations: u64, + /// The number of tasks scheduled from **outside** of the runtime. /// /// The remote schedule count increases by one each time a task is woken from **outside** of @@ -959,6 +1048,7 @@ struct Worker { total_park_count: u64, total_noop_count: u64, total_steal_count: u64, + total_steal_operations: u64, total_local_schedule_count: u64, total_overflow_count: u64, total_polls_count: u64, @@ -1106,6 +1196,7 @@ impl Worker { total_park_count: rt.worker_park_count(worker), total_noop_count: rt.worker_noop_count(worker), total_steal_count: rt.worker_steal_count(worker), + total_steal_operations: rt.worker_steal_operations(worker), total_local_schedule_count: rt.worker_local_schedule_count(worker), total_overflow_count: rt.worker_overflow_count(worker), total_polls_count: rt.worker_poll_count(worker), @@ -1150,6 +1241,12 @@ impl Worker { min_steal_count, worker_steal_count ); + metric!( + total_steal_operations, + max_steal_operations, + min_steal_operations, + worker_steal_operations + ); metric!( total_local_schedule_count, max_local_schedule_count,