Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add steal_operations metric #37

Merged
merged 1 commit into from Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -31,7 +31,7 @@ futures = "0.3.21"
num_cpus = "1.13.1"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"
tokio = { version = "1.15.0", features = ["full", "rt", "time", "macros", "test-util"] }
tokio = { version = "1.26.0", features = ["full", "rt", "time", "macros", "test-util"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why this change is here. This was already set. I wonder if GitHub's viewer is bugged?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one was for the dev-dependency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah


[[example]]
name = "runtime"
Expand Down
9 changes: 9 additions & 0 deletions README.md
Expand Up @@ -182,6 +182,12 @@ tokio::spawn(do_work());
The maximum number of tasks any worker thread stole from another worker thread.
- **[`min_steal_count`]**
The minimum number of tasks any worker thread stole from another worker thread.
- **[`total_steal_operations`]**
The number of times worker threads stole tasks from another worker thread.
- **[`max_steal_operations`]**
The maximum number of times any worker thread stole tasks from another worker thread.
- **[`min_steal_operations`]**
The minimum number of times any worker thread stole tasks from another worker thread.
- **[`num_remote_schedules`]**
The number of tasks scheduled from outside of the runtime.
- **[`total_local_schedule_count`]**
Expand Down Expand Up @@ -233,6 +239,9 @@ tokio::spawn(do_work());
[`total_steal_count`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.total_steal_count
[`max_steal_count`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.max_steal_count
[`min_steal_count`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.min_steal_count
[`total_steal_operations`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.total_steal_operations
[`max_steal_operations`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.max_steal_operations
[`min_steal_operations`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.min_steal_operations
[`num_remote_schedules`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.num_remote_schedules
[`total_local_schedule_count`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.total_local_schedule_count
[`max_local_schedule_count`]: https://docs.rs/tokio-metrics/0.1.*/tokio_metrics/struct.RuntimeMetrics.html#structfield.max_local_schedule_count
Expand Down
99 changes: 98 additions & 1 deletion src/runtime.rs
Expand Up @@ -256,7 +256,7 @@ pub struct RuntimeMetrics {

/// The number of tasks worker threads stole from another worker thread.
///
/// The worker steal count starts increases by the amount of stolen tasks each time the worker
/// The worker steal count increases by the amount of stolen tasks each time the worker
/// has processed its scheduled queue and successfully steals more pending tasks from another
/// worker.
///
Expand Down Expand Up @@ -344,6 +344,95 @@ pub struct RuntimeMetrics {
/// - [`RuntimeMetrics::max_steal_count`]
pub min_steal_count: u64,

/// The number of times worker threads stole tasks from another worker thread.
///
/// The worker steal operations increases by one each time the worker has processed its
/// scheduled queue and successfully steals more pending tasks from another worker.
///
/// This metric only applies to the **multi-threaded** runtime and will always return `0` when
/// using the current thread runtime.
///
/// ##### Definition
/// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
/// for all worker threads.
///
/// ##### See also
/// - [`RuntimeMetrics::min_steal_operations`]
/// - [`RuntimeMetrics::max_steal_operations`]
///
/// ##### Examples
/// In the below example, a blocking channel is used to backup one worker thread:
/// ```
/// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
/// async fn main() {
/// let handle = tokio::runtime::Handle::current();
/// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
/// let mut intervals = monitor.intervals();
/// let mut next_interval = || intervals.next().unwrap();
///
/// let interval = next_interval(); // end of first sampling interval
/// assert_eq!(interval.total_steal_operations, 0);
/// assert_eq!(interval.min_steal_operations, 0);
/// assert_eq!(interval.max_steal_operations, 0);
///
/// // induce a steal
/// async {
/// let (tx, rx) = std::sync::mpsc::channel();
/// // Move to the runtime.
/// tokio::spawn(async move {
/// // Spawn the task that sends to the channel
/// tokio::spawn(async move {
/// tx.send(()).unwrap();
/// });
/// // Spawn a task that bumps the previous task out of the "next
/// // scheduled" slot.
/// tokio::spawn(async {});
/// // Blocking receive on the channe.
/// rx.recv().unwrap();
/// flush_metrics().await;
/// }).await.unwrap();
/// flush_metrics().await;
/// }.await;
///
/// let interval = { flush_metrics().await; next_interval() }; // end of interval 2
/// assert_eq!(interval.total_steal_operations, 1);
/// assert_eq!(interval.min_steal_operations, 0);
/// assert_eq!(interval.max_steal_operations, 1);
///
/// let interval = { flush_metrics().await; next_interval() }; // end of interval 3
/// assert_eq!(interval.total_steal_operations, 0);
/// assert_eq!(interval.min_steal_operations, 0);
/// assert_eq!(interval.max_steal_operations, 0);
/// }
///
/// async fn flush_metrics() {
/// let _ = tokio::time::sleep(std::time::Duration::ZERO).await;
/// }
/// ```
pub total_steal_operations: u64,

/// The maximum number of times any worker thread stole tasks from another worker thread.
///
/// ##### Definition
/// This metric is derived from the maximum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
/// across all worker threads.
///
/// ##### See also
/// - [`RuntimeMetrics::total_steal_operations`]
/// - [`RuntimeMetrics::min_steal_operations`]
pub max_steal_operations: u64,

/// The minimum number of times any worker thread stole tasks from another worker thread.
///
/// ##### Definition
/// This metric is derived from the minimum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
/// across all worker threads.
///
/// ##### See also
/// - [`RuntimeMetrics::total_steal_operations`]
/// - [`RuntimeMetrics::max_steal_operations`]
pub min_steal_operations: u64,

/// The number of tasks scheduled from **outside** of the runtime.
///
/// The remote schedule count increases by one each time a task is woken from **outside** of
Expand Down Expand Up @@ -959,6 +1048,7 @@ struct Worker {
total_park_count: u64,
total_noop_count: u64,
total_steal_count: u64,
total_steal_operations: u64,
total_local_schedule_count: u64,
total_overflow_count: u64,
total_polls_count: u64,
Expand Down Expand Up @@ -1106,6 +1196,7 @@ impl Worker {
total_park_count: rt.worker_park_count(worker),
total_noop_count: rt.worker_noop_count(worker),
total_steal_count: rt.worker_steal_count(worker),
total_steal_operations: rt.worker_steal_operations(worker),
total_local_schedule_count: rt.worker_local_schedule_count(worker),
total_overflow_count: rt.worker_overflow_count(worker),
total_polls_count: rt.worker_poll_count(worker),
Expand Down Expand Up @@ -1150,6 +1241,12 @@ impl Worker {
min_steal_count,
worker_steal_count
);
metric!(
total_steal_operations,
max_steal_operations,
min_steal_operations,
worker_steal_operations
);
metric!(
total_local_schedule_count,
max_local_schedule_count,
Expand Down