diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index dd166109469..9c536141996 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -11,6 +11,7 @@ use crate::runtime::{Builder, Callback, Handle}; use std::collections::{HashMap, VecDeque}; use std::fmt; use std::io; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; pub(crate) struct BlockingPool { @@ -23,6 +24,53 @@ pub(crate) struct Spawner { inner: Arc, } +#[derive(Default)] +pub(crate) struct SpawnerMetrics { + num_threads: AtomicUsize, + num_idle_threads: AtomicUsize, + queue_depth: AtomicUsize, +} + +impl SpawnerMetrics { + fn num_threads(&self) -> usize { + self.num_threads.load(Ordering::Relaxed) + } + + fn num_idle_threads(&self) -> usize { + self.num_idle_threads.load(Ordering::Relaxed) + } + + cfg_metrics! { + fn queue_depth(&self) -> usize { + self.queue_depth.load(Ordering::Relaxed) + } + } + + fn inc_num_threads(&self) { + self.num_threads.fetch_add(1, Ordering::Relaxed); + } + + fn dec_num_threads(&self) { + self.num_threads.fetch_sub(1, Ordering::Relaxed); + } + + fn inc_num_idle_threads(&self) { + self.num_idle_threads.fetch_add(1, Ordering::Relaxed); + } + + fn dec_num_idle_threads(&self) -> usize { + self.num_idle_threads.fetch_sub(1, Ordering::Relaxed) + } + + fn inc_queue_depth(&self) { + self.queue_depth.fetch_add(1, Ordering::Relaxed); + } + + fn dec_queue_depth(&self) { + self.queue_depth.fetch_sub(1, Ordering::Relaxed); + } +} + struct Inner { /// State shared between worker threads. shared: Mutex, @@ -47,12 +95,13 @@ struct Inner { // Customizable wait timeout. keep_alive: Duration, + + // Metrics about the pool. + metrics: SpawnerMetrics, } struct Shared { queue: VecDeque, - num_th: usize, - num_idle: u32, num_notify: u32, shutdown: bool, shutdown_tx: Option, @@ -165,8 +214,6 @@ impl BlockingPool { inner: Arc::new(Inner { shared: Mutex::new(Shared { queue: VecDeque::new(), - num_th: 0, - num_idle: 0, num_notify: 0, shutdown: false, shutdown_tx: Some(shutdown_tx), @@ -181,6 +228,7 @@ impl BlockingPool { before_stop: builder.before_stop.clone(), thread_cap, keep_alive, + metrics: Default::default(), }), }, shutdown_rx, @@ -350,11 +398,12 @@ impl Spawner { } shared.queue.push_back(task); + self.inner.metrics.inc_queue_depth(); - if shared.num_idle == 0 { + if self.inner.metrics.num_idle_threads() == 0 { // No threads are able to process the task. - if shared.num_th == self.inner.thread_cap { + if self.inner.metrics.num_threads() == self.inner.thread_cap { // At max number of threads } else { assert!(shared.shutdown_tx.is_some()); @@ -365,11 +414,14 @@ impl Spawner { match self.spawn_thread(shutdown_tx, rt, id) { Ok(handle) => { - shared.num_th += 1; + self.inner.metrics.inc_num_threads(); shared.worker_thread_index += 1; shared.worker_threads.insert(id, handle); } - Err(ref e) if is_temporary_os_thread_error(e) && shared.num_th > 0 => { + Err(ref e) + if is_temporary_os_thread_error(e) + && self.inner.metrics.num_threads() > 0 => + { // OS temporarily failed to spawn a new thread. // The task will be picked up eventually by a currently // busy thread. @@ -388,7 +440,7 @@ impl Spawner { // exactly. Thread libraries may generate spurious // wakeups, this counter is used to keep us in a // consistent state. - shared.num_idle -= 1; + self.inner.metrics.dec_num_idle_threads(); shared.num_notify += 1; self.inner.condvar.notify_one(); } @@ -419,6 +471,22 @@ impl Spawner { } } +cfg_metrics! { + impl Spawner { + pub(crate) fn num_threads(&self) -> usize { + self.inner.metrics.num_threads() + } + + pub(crate) fn num_idle_threads(&self) -> usize { + self.inner.metrics.num_idle_threads() + } + + pub(crate) fn queue_depth(&self) -> usize { + self.inner.metrics.queue_depth() + } + } +} + // Tells whether the error when spawning a thread is temporary. #[inline] fn is_temporary_os_thread_error(error: &std::io::Error) -> bool { @@ -437,6 +505,7 @@ impl Inner { 'main: loop { // BUSY while let Some(task) = shared.queue.pop_front() { + self.metrics.dec_queue_depth(); drop(shared); task.run(); @@ -444,7 +513,7 @@ impl Inner { } // IDLE - shared.num_idle += 1; + self.metrics.inc_num_idle_threads(); while !shared.shutdown { let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap(); @@ -478,6 +547,7 @@ impl Inner { if shared.shutdown { // Drain the queue while let Some(task) = shared.queue.pop_front() { + self.metrics.dec_queue_depth(); drop(shared); task.shutdown_or_run_if_mandatory(); @@ -488,7 +558,7 @@ impl Inner { // Work was produced, and we "took" it (by decrementing num_notify). // This means that num_idle was decremented once for our wakeup. // But, since we are exiting, we need to "undo" that, as we'll stay idle. - shared.num_idle += 1; + self.metrics.inc_num_idle_threads(); // NOTE: Technically we should also do num_notify++ and notify again, // but since we're shutting down anyway, that won't be necessary. break; @@ -496,17 +566,17 @@ impl Inner { } // Thread exit - shared.num_th -= 1; + self.metrics.dec_num_threads(); // num_idle should now be tracked exactly, panic // with a descriptive message if it is not the // case. - shared.num_idle = shared - .num_idle - .checked_sub(1) - .expect("num_idle underflowed on thread exit"); + let prev_idle = self.metrics.dec_num_idle_threads(); + if prev_idle < self.metrics.num_idle_threads() { + panic!("num_idle_threads underflowed on thread exit") + } - if shared.shutdown && shared.num_th == 0 { + if shared.shutdown && self.metrics.num_threads() == 0 { self.condvar.notify_one(); } diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 49c926302f5..dee14a45729 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -42,6 +42,56 @@ impl RuntimeMetrics { self.handle.inner.num_workers() } + /// Returns the number of additional threads spawned by the runtime. + /// + /// The number of workers is set by configuring `max_blocking_threads` on + /// `runtime::Builder`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let _ = tokio::task::spawn_blocking(move || { + /// // Stand-in for compute-heavy work or using synchronous APIs + /// 1 + 1 + /// }).await; + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.num_blocking_threads(); + /// println!("Runtime has created {} threads", n); + /// } + /// ``` + pub fn num_blocking_threads(&self) -> usize { + self.handle.inner.num_blocking_threads() + } + + /// Returns the number of idle threads, which hve spawned by the runtime + /// for `spawn_blocking` calls. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let _ = tokio::task::spawn_blocking(move || { + /// // Stand-in for compute-heavy work or using synchronous APIs + /// 1 + 1 + /// }).await; + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.num_idle_blocking_threads(); + /// println!("Runtime has {} idle blocking thread pool threads", n); + /// } + /// ``` + pub fn num_idle_blocking_threads(&self) -> usize { + self.handle.inner.num_idle_blocking_threads() + } + /// Returns the number of tasks scheduled from **outside** of the runtime. /// /// The remote schedule count starts at zero when the runtime is created and @@ -446,6 +496,30 @@ impl RuntimeMetrics { pub fn worker_local_queue_depth(&self, worker: usize) -> usize { self.handle.inner.worker_local_queue_depth(worker) } + + /// Returns the number of tasks currently scheduled in the blocking + /// thread pool, spawned using `spawn_blocking`. + /// + /// This metric returns the **current** number of tasks pending in + /// blocking thread pool. As such, the returned value may increase + /// or decrease as new tasks are scheduled and processed. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.blocking_queue_depth(); + /// println!("{} tasks currently pending in the blocking thread pool", n); + /// } + /// ``` + pub fn blocking_queue_depth(&self) -> usize { + self.handle.inner.blocking_queue_depth() + } } cfg_net! { diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index 6bcc1750a44..9297687023f 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -428,6 +428,18 @@ cfg_metrics! { assert_eq!(0, worker); &self.shared.worker_metrics } + + pub(crate) fn num_blocking_threads(&self) -> usize { + self.blocking_spawner.num_threads() + } + + pub(crate) fn num_idle_blocking_threads(&self) -> usize { + self.blocking_spawner.num_idle_threads() + } + + pub(crate) fn blocking_queue_depth(&self) -> usize { + self.blocking_spawner.queue_depth() + } } } diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 18ac474fa75..b991c89f00c 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -111,6 +111,22 @@ cfg_rt! { } } + pub(crate) fn num_blocking_threads(&self) -> usize { + match self { + Handle::CurrentThread(handle) => handle.num_blocking_threads(), + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.num_blocking_threads(), + } + } + + pub(crate) fn num_idle_blocking_threads(&self) -> usize { + match self { + Handle::CurrentThread(handle) => handle.num_idle_blocking_threads(), + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.num_idle_blocking_threads(), + } + } + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { match self { Handle::CurrentThread(handle) => handle.scheduler_metrics(), @@ -142,6 +158,14 @@ cfg_rt! { Handle::MultiThread(handle) => handle.worker_local_queue_depth(worker), } } + + pub(crate) fn blocking_queue_depth(&self) -> usize { + match self { + Handle::CurrentThread(handle) => handle.blocking_queue_depth(), + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.blocking_queue_depth(), + } + } } } } diff --git a/tokio/src/runtime/scheduler/multi_thread/handle.rs b/tokio/src/runtime/scheduler/multi_thread/handle.rs index 884f400bf00..69a4620c127 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle.rs @@ -61,6 +61,14 @@ cfg_metrics! { self.shared.worker_metrics.len() } + pub(crate) fn num_blocking_threads(&self) -> usize { + self.blocking_spawner.num_threads() + } + + pub(crate) fn num_idle_blocking_threads(&self) -> usize { + self.blocking_spawner.num_idle_threads() + } + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { &self.shared.scheduler_metrics } @@ -76,6 +84,10 @@ cfg_metrics! { pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { self.shared.worker_local_queue_depth(worker) } + + pub(crate) fn blocking_queue_depth(&self) -> usize { + self.blocking_spawner.queue_depth() + } } } diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index cffc117bce2..71a60e4dfde 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -1,6 +1,8 @@ #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", tokio_unstable, not(tokio_wasi)))] +use std::sync::{Arc, Mutex}; + use tokio::runtime::Runtime; use tokio::time::{self, Duration}; @@ -13,6 +15,55 @@ fn num_workers() { assert_eq!(2, rt.metrics().num_workers()); } +#[test] +fn num_blocking_threads() { + let rt = current_thread(); + assert_eq!(0, rt.metrics().num_blocking_threads()); + let _ = rt.block_on(rt.spawn_blocking(move || {})); + assert_eq!(1, rt.metrics().num_blocking_threads()); +} + +#[test] +fn num_idle_blocking_threads() { + let rt = current_thread(); + assert_eq!(0, rt.metrics().num_idle_blocking_threads()); + let _ = rt.block_on(rt.spawn_blocking(move || {})); + rt.block_on(async { + time::sleep(Duration::from_millis(5)).await; + }); + assert_eq!(1, rt.metrics().num_idle_blocking_threads()); +} + +#[test] +fn blocking_queue_depth() { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(1) + .build() + .unwrap(); + + assert_eq!(0, rt.metrics().blocking_queue_depth()); + + let ready = Arc::new(Mutex::new(())); + let guard = ready.lock().unwrap(); + + let ready_cloned = ready.clone(); + let wait_until_ready = move || { + let _unused = ready_cloned.lock().unwrap(); + }; + + let h1 = rt.spawn_blocking(wait_until_ready.clone()); + let h2 = rt.spawn_blocking(wait_until_ready); + assert!(rt.metrics().blocking_queue_depth() > 0); + + drop(guard); + + let _ = rt.block_on(h1); + let _ = rt.block_on(h2); + + assert_eq!(0, rt.metrics().blocking_queue_depth()); +} + #[test] fn remote_schedule_count() { use std::thread;