diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index f3f206ce1f2..9c536141996 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -27,6 +27,7 @@ pub(crate) struct Spawner { #[derive(Default)] pub(crate) struct SpawnerMetrics { num_threads: AtomicUsize, + num_idle_threads: AtomicUsize, queue_depth: AtomicUsize, } @@ -35,6 +36,10 @@ impl SpawnerMetrics { self.num_threads.load(Ordering::Relaxed) } + fn num_idle_threads(&self) -> usize { + self.num_idle_threads.load(Ordering::Relaxed) + } + cfg_metrics! { fn queue_depth(&self) -> usize { self.queue_depth.load(Ordering::Relaxed) @@ -49,6 +54,14 @@ impl SpawnerMetrics { self.num_threads.fetch_sub(1, Ordering::Relaxed); } + fn inc_num_idle_threads(&self) { + self.num_idle_threads.fetch_add(1, Ordering::Relaxed); + } + + fn dec_num_idle_threads(&self) -> usize { + self.num_idle_threads.fetch_sub(1, Ordering::Relaxed) + } + fn inc_queue_depth(&self) { self.queue_depth.fetch_add(1, Ordering::Relaxed); } @@ -89,7 +102,6 @@ struct Inner { struct Shared { queue: VecDeque, - num_idle: u32, num_notify: u32, shutdown: bool, shutdown_tx: Option, @@ -202,7 +214,6 @@ impl BlockingPool { inner: Arc::new(Inner { shared: Mutex::new(Shared { queue: VecDeque::new(), - num_idle: 0, num_notify: 0, shutdown: false, shutdown_tx: Some(shutdown_tx), @@ -389,7 +400,7 @@ impl Spawner { shared.queue.push_back(task); self.inner.metrics.inc_queue_depth(); - if shared.num_idle == 0 { + if self.inner.metrics.num_idle_threads() == 0 { // No threads are able to process the task. if self.inner.metrics.num_threads() == self.inner.thread_cap { @@ -429,7 +440,7 @@ impl Spawner { // exactly. Thread libraries may generate spurious // wakeups, this counter is used to keep us in a // consistent state. - shared.num_idle -= 1; + self.inner.metrics.dec_num_idle_threads(); shared.num_notify += 1; self.inner.condvar.notify_one(); } @@ -466,6 +477,10 @@ cfg_metrics! { self.inner.metrics.num_threads() } + pub(crate) fn num_idle_threads(&self) -> usize { + self.inner.metrics.num_idle_threads() + } + pub(crate) fn queue_depth(&self) -> usize { self.inner.metrics.queue_depth() } @@ -498,7 +513,7 @@ impl Inner { } // IDLE - shared.num_idle += 1; + self.metrics.inc_num_idle_threads(); while !shared.shutdown { let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap(); @@ -543,7 +558,7 @@ impl Inner { // Work was produced, and we "took" it (by decrementing num_notify). // This means that num_idle was decremented once for our wakeup. // But, since we are exiting, we need to "undo" that, as we'll stay idle. - shared.num_idle += 1; + self.metrics.inc_num_idle_threads(); // NOTE: Technically we should also do num_notify++ and notify again, // but since we're shutting down anyway, that won't be necessary. break; @@ -556,10 +571,10 @@ impl Inner { // num_idle should now be tracked exactly, panic // with a descriptive message if it is not the // case. - shared.num_idle = shared - .num_idle - .checked_sub(1) - .expect("num_idle underflowed on thread exit"); + let prev_idle = self.metrics.dec_num_idle_threads(); + if prev_idle < self.metrics.num_idle_threads() { + panic!("num_idle_threads underflowed on thread exit") + } if shared.shutdown && self.metrics.num_threads() == 0 { self.condvar.notify_one(); diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index cda162bb0c0..dee14a45729 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -68,6 +68,30 @@ impl RuntimeMetrics { self.handle.inner.num_blocking_threads() } + /// Returns the number of idle threads, which hve spawned by the runtime + /// for `spawn_blocking` calls. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let _ = tokio::task::spawn_blocking(move || { + /// // Stand-in for compute-heavy work or using synchronous APIs + /// 1 + 1 + /// }).await; + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.num_idle_blocking_threads(); + /// println!("Runtime has {} idle blocking thread pool threads", n); + /// } + /// ``` + pub fn num_idle_blocking_threads(&self) -> usize { + self.handle.inner.num_idle_blocking_threads() + } + /// Returns the number of tasks scheduled from **outside** of the runtime. /// /// The remote schedule count starts at zero when the runtime is created and diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index f466a83e9e4..3c739bfa430 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -429,6 +429,10 @@ cfg_metrics! { self.blocking_spawner.num_threads() } + pub(crate) fn num_idle_blocking_threads(&self) -> usize { + self.blocking_spawner.num_idle_threads() + } + pub(crate) fn blocking_queue_depth(&self) -> usize { self.blocking_spawner.queue_depth() } diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index eabf5a77248..b991c89f00c 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -119,6 +119,14 @@ cfg_rt! { } } + pub(crate) fn num_idle_blocking_threads(&self) -> usize { + match self { + Handle::CurrentThread(handle) => handle.num_idle_blocking_threads(), + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.num_idle_blocking_threads(), + } + } + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { match self { Handle::CurrentThread(handle) => handle.scheduler_metrics(), diff --git a/tokio/src/runtime/scheduler/multi_thread/handle.rs b/tokio/src/runtime/scheduler/multi_thread/handle.rs index c0815231bb2..69a4620c127 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle.rs @@ -65,6 +65,10 @@ cfg_metrics! { self.blocking_spawner.num_threads() } + pub(crate) fn num_idle_blocking_threads(&self) -> usize { + self.blocking_spawner.num_idle_threads() + } + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { &self.shared.scheduler_metrics } diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index 47dea08ea4c..71a60e4dfde 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -23,6 +23,17 @@ fn num_blocking_threads() { assert_eq!(1, rt.metrics().num_blocking_threads()); } +#[test] +fn num_idle_blocking_threads() { + let rt = current_thread(); + assert_eq!(0, rt.metrics().num_idle_blocking_threads()); + let _ = rt.block_on(rt.spawn_blocking(move || {})); + rt.block_on(async { + time::sleep(Duration::from_millis(5)).await; + }); + assert_eq!(1, rt.metrics().num_idle_blocking_threads()); +} + #[test] fn blocking_queue_depth() { let rt = tokio::runtime::Builder::new_current_thread()