Skip to content

Commit

Permalink
rt: support mean poll duration metric (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
carllerche committed Aug 14, 2023
1 parent ba9ad8c commit ceb1b1d
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 14 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Expand Up @@ -22,7 +22,7 @@ rt = ["tokio"]
tokio-stream = "0.1.11"
futures-util = "0.3.19"
pin-project-lite = "0.2.7"
tokio = { version = "1.26.0", features = ["rt", "stats", "time", "net"], optional = true }
tokio = { version = "1.31.0", features = ["rt", "stats", "time", "net"], optional = true }

[dev-dependencies]
axum = "0.6"
Expand All @@ -47,4 +47,4 @@ all-features = true
rustdoc-args = ["--cfg", "docsrs", "--cfg", "tokio_unstable"]
# it's necessary to _also_ pass `--cfg tokio_unstable` to rustc, or else
# dependencies will not be enabled, and the docs build will fail.
rustc-args = ["--cfg", "tokio_unstable"]
rustc-args = ["--cfg", "tokio_unstable"]
116 changes: 104 additions & 12 deletions src/runtime.rs
Expand Up @@ -178,6 +178,68 @@ pub struct RuntimeMetrics {
/// - [`RuntimeMetrics::max_park_count`]
pub min_park_count: u64,

/// The average duration of a single invocation of poll on a task.
///
/// This average is an exponentially-weighted moving average of the duration
/// of task polls on all runtime workers.
///
/// ##### Examples
/// ```
/// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
/// async fn main() {
/// let handle = tokio::runtime::Handle::current();
/// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
/// let mut intervals = monitor.intervals();
/// let mut next_interval = || intervals.next().unwrap();
///
/// let interval = next_interval();
/// println!("mean task poll duration is {:?}", interval.mean_poll_duration);
/// }
/// ```
pub mean_poll_duration: Duration,

/// The average duration of a single invocation of poll on a task on the
/// worker with the lowest value.
///
/// This average is an exponentially-weighted moving average of the duration
/// of task polls on the runtime worker with the lowest value.
///
/// ##### Examples
/// ```
/// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
/// async fn main() {
/// let handle = tokio::runtime::Handle::current();
/// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
/// let mut intervals = monitor.intervals();
/// let mut next_interval = || intervals.next().unwrap();
///
/// let interval = next_interval();
/// println!("min mean task poll duration is {:?}", interval.mean_poll_duration_worker_min);
/// }
/// ```
pub mean_poll_duration_worker_min: Duration,

/// The average duration of a single invocation of poll on a task on the
/// worker with the highest value.
///
/// This average is an exponentially-weighted moving average of the duration
/// of task polls on the runtime worker with the highest value.
///
/// ##### Examples
/// ```
/// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
/// async fn main() {
/// let handle = tokio::runtime::Handle::current();
/// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
/// let mut intervals = monitor.intervals();
/// let mut next_interval = || intervals.next().unwrap();
///
/// let interval = next_interval();
/// println!("max mean task poll duration is {:?}", interval.mean_poll_duration_worker_max);
/// }
/// ```
pub mean_poll_duration_worker_max: Duration,

/// The number of times worker threads unparked but performed no work before parking again.
///
/// The worker no-op count increases by one each time the worker unparks the thread but finds
Expand Down Expand Up @@ -306,14 +368,10 @@ pub struct RuntimeMetrics {
/// }.await;
///
/// let interval = { flush_metrics().await; next_interval() }; // end of interval 2
/// assert_eq!(interval.total_steal_count, 1);
/// assert_eq!(interval.min_steal_count, 0);
/// assert_eq!(interval.max_steal_count, 1);
/// println!("total={}; min={}; max={}", interval.total_steal_count, interval.min_steal_count, interval.max_steal_count);
///
/// let interval = { flush_metrics().await; next_interval() }; // end of interval 3
/// assert_eq!(interval.total_steal_count, 0);
/// assert_eq!(interval.min_steal_count, 0);
/// assert_eq!(interval.max_steal_count, 0);
/// println!("total={}; min={}; max={}", interval.total_steal_count, interval.min_steal_count, interval.max_steal_count);
/// }
///
/// async fn flush_metrics() {
Expand Down Expand Up @@ -395,14 +453,10 @@ pub struct RuntimeMetrics {
/// }.await;
///
/// let interval = { flush_metrics().await; next_interval() }; // end of interval 2
/// assert_eq!(interval.total_steal_operations, 1);
/// assert_eq!(interval.min_steal_operations, 0);
/// assert_eq!(interval.max_steal_operations, 1);
/// println!("total={}; min={}; max={}", interval.total_steal_operations, interval.min_steal_operations, interval.max_steal_operations);
///
/// let interval = { flush_metrics().await; next_interval() }; // end of interval 3
/// assert_eq!(interval.total_steal_operations, 0);
/// assert_eq!(interval.min_steal_operations, 0);
/// assert_eq!(interval.max_steal_operations, 0);
/// println!("total={}; min={}; max={}", interval.total_steal_operations, interval.min_steal_operations, interval.max_steal_operations);
/// }
///
/// async fn flush_metrics() {
Expand Down Expand Up @@ -1091,6 +1145,7 @@ impl RuntimeIntervals {
min_polls_count: u64::MAX,
min_busy_duration: Duration::from_secs(1000000000),
min_local_queue_depth: usize::MAX,
mean_poll_duration_worker_min: Duration::MAX,
budget_forced_yield_count: budget_forced_yields - self.budget_forced_yield_count,
io_driver_ready_count: io_driver_ready_events - self.io_driver_ready_count,
..Default::default()
Expand All @@ -1105,6 +1160,13 @@ impl RuntimeIntervals {
worker.probe(&self.runtime, &mut metrics);
}

if metrics.total_polls_count == 0 {
debug_assert_eq!(metrics.mean_poll_duration, Duration::default());

metrics.mean_poll_duration_worker_max = Duration::default();
metrics.mean_poll_duration_worker_min = Duration::default();
}

metrics
}
}
Expand Down Expand Up @@ -1223,6 +1285,9 @@ impl Worker {
}};
}

let mut worker_polls_count = self.total_polls_count;
let total_polls_count = metrics.total_polls_count;

metric!(
total_park_count,
max_park_count,
Expand Down Expand Up @@ -1272,6 +1337,33 @@ impl Worker {
worker_total_busy_duration
);

// Get the number of polls since last probe
worker_polls_count = self.total_polls_count - worker_polls_count;

// Update the mean task poll duration if there were polls
if worker_polls_count > 0 {
let val = rt.worker_mean_poll_time(self.worker);

if val > metrics.mean_poll_duration_worker_max {
metrics.mean_poll_duration_worker_max = val;
}

if val < metrics.mean_poll_duration_worker_min {
metrics.mean_poll_duration_worker_min = val;
}

// First, scale the current value down
let ratio = total_polls_count as f64 / metrics.total_polls_count as f64;
let mut mean = metrics.mean_poll_duration.as_nanos() as f64 * ratio;

// Add the scaled current worker's mean poll duration
let ratio = worker_polls_count as f64 / metrics.total_polls_count as f64;
mean += val.as_nanos() as f64 * ratio;

metrics.mean_poll_duration = Duration::from_nanos(mean as u64);
}


// Local scheduled tasks is an absolute value

let local_scheduled_tasks = rt.worker_local_queue_depth(self.worker);
Expand Down

0 comments on commit ceb1b1d

Please sign in to comment.