Skip to content

Commit

Permalink
idle blocking threads
Browse files Browse the repository at this point in the history
  • Loading branch information
duarten committed Nov 4, 2022
1 parent 7eaf5ad commit 7de50a6
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 10 deletions.
35 changes: 25 additions & 10 deletions tokio/src/runtime/blocking/pool.rs
Expand Up @@ -27,6 +27,7 @@ pub(crate) struct Spawner {
#[derive(Default)]
pub(crate) struct SpawnerMetrics {
num_threads: AtomicUsize,
num_idle_threads: AtomicUsize,
queue_depth: AtomicUsize,
}

Expand All @@ -35,6 +36,10 @@ impl SpawnerMetrics {
self.num_threads.load(Ordering::Relaxed)
}

fn num_idle_threads(&self) -> usize {
self.num_idle_threads.load(Ordering::Relaxed)
}

cfg_metrics! {
fn queue_depth(&self) -> usize {
self.queue_depth.load(Ordering::Relaxed)
Expand All @@ -49,6 +54,14 @@ impl SpawnerMetrics {
self.num_threads.fetch_sub(1, Ordering::Relaxed);
}

fn inc_num_idle_threads(&self) {
self.num_idle_threads.fetch_add(1, Ordering::Relaxed);
}

fn dec_num_idle_threads(&self) -> usize {
self.num_idle_threads.fetch_sub(1, Ordering::Relaxed)
}

fn inc_queue_depth(&self) {
self.queue_depth.fetch_add(1, Ordering::Relaxed);
}
Expand Down Expand Up @@ -89,7 +102,6 @@ struct Inner {

struct Shared {
queue: VecDeque<Task>,
num_idle: u32,
num_notify: u32,
shutdown: bool,
shutdown_tx: Option<shutdown::Sender>,
Expand Down Expand Up @@ -202,7 +214,6 @@ impl BlockingPool {
inner: Arc::new(Inner {
shared: Mutex::new(Shared {
queue: VecDeque::new(),
num_idle: 0,
num_notify: 0,
shutdown: false,
shutdown_tx: Some(shutdown_tx),
Expand Down Expand Up @@ -389,7 +400,7 @@ impl Spawner {
shared.queue.push_back(task);
self.inner.metrics.inc_queue_depth();

if shared.num_idle == 0 {
if self.inner.metrics.num_idle_threads() == 0 {
// No threads are able to process the task.

if self.inner.metrics.num_threads() == self.inner.thread_cap {
Expand Down Expand Up @@ -429,7 +440,7 @@ impl Spawner {
// exactly. Thread libraries may generate spurious
// wakeups, this counter is used to keep us in a
// consistent state.
shared.num_idle -= 1;
self.inner.metrics.dec_num_idle_threads();
shared.num_notify += 1;
self.inner.condvar.notify_one();
}
Expand Down Expand Up @@ -466,6 +477,10 @@ cfg_metrics! {
self.inner.metrics.num_threads()
}

pub(crate) fn num_idle_threads(&self) -> usize {
self.inner.metrics.num_idle_threads()
}

pub(crate) fn queue_depth(&self) -> usize {
self.inner.metrics.queue_depth()
}
Expand Down Expand Up @@ -498,7 +513,7 @@ impl Inner {
}

// IDLE
shared.num_idle += 1;
self.metrics.inc_num_idle_threads();

while !shared.shutdown {
let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();
Expand Down Expand Up @@ -543,7 +558,7 @@ impl Inner {
// Work was produced, and we "took" it (by decrementing num_notify).
// This means that num_idle was decremented once for our wakeup.
// But, since we are exiting, we need to "undo" that, as we'll stay idle.
shared.num_idle += 1;
self.metrics.inc_num_idle_threads();
// NOTE: Technically we should also do num_notify++ and notify again,
// but since we're shutting down anyway, that won't be necessary.
break;
Expand All @@ -556,10 +571,10 @@ impl Inner {
// num_idle should now be tracked exactly, panic
// with a descriptive message if it is not the
// case.
shared.num_idle = shared
.num_idle
.checked_sub(1)
.expect("num_idle underflowed on thread exit");
let prev_idle = self.metrics.dec_num_idle_threads();
if prev_idle < self.metrics.num_idle_threads() {
panic!("num_idle_threads underflowed on thread exit")
}

if shared.shutdown && self.metrics.num_threads() == 0 {
self.condvar.notify_one();
Expand Down
24 changes: 24 additions & 0 deletions tokio/src/runtime/metrics/runtime.rs
Expand Up @@ -68,6 +68,30 @@ impl RuntimeMetrics {
self.handle.inner.num_blocking_threads()
}

/// Returns the number of idle threads, which hve spawned by the runtime
/// for `spawn_blocking` calls.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let _ = tokio::task::spawn_blocking(move || {
/// // Stand-in for compute-heavy work or using synchronous APIs
/// 1 + 1
/// }).await;
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.num_idle_blocking_threads();
/// println!("Runtime has {} idle blocking thread pool threads", n);
/// }
/// ```
pub fn num_idle_blocking_threads(&self) -> usize {
self.handle.inner.num_idle_blocking_threads()
}

/// Returns the number of tasks scheduled from **outside** of the runtime.
///
/// The remote schedule count starts at zero when the runtime is created and
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/current_thread.rs
Expand Up @@ -429,6 +429,10 @@ cfg_metrics! {
self.blocking_spawner.num_threads()
}

pub(crate) fn num_idle_blocking_threads(&self) -> usize {
self.blocking_spawner.num_idle_threads()
}

pub(crate) fn blocking_queue_depth(&self) -> usize {
self.blocking_spawner.queue_depth()
}
Expand Down
8 changes: 8 additions & 0 deletions tokio/src/runtime/scheduler/mod.rs
Expand Up @@ -119,6 +119,14 @@ cfg_rt! {
}
}

pub(crate) fn num_idle_blocking_threads(&self) -> usize {
match self {
Handle::CurrentThread(handle) => handle.num_idle_blocking_threads(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.num_idle_blocking_threads(),
}
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
match self {
Handle::CurrentThread(handle) => handle.scheduler_metrics(),
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
Expand Up @@ -65,6 +65,10 @@ cfg_metrics! {
self.blocking_spawner.num_threads()
}

pub(crate) fn num_idle_blocking_threads(&self) -> usize {
self.blocking_spawner.num_idle_threads()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
Expand Down
11 changes: 11 additions & 0 deletions tokio/tests/rt_metrics.rs
Expand Up @@ -23,6 +23,17 @@ fn num_blocking_threads() {
assert_eq!(1, rt.metrics().num_blocking_threads());
}

#[test]
fn num_idle_blocking_threads() {
let rt = current_thread();
assert_eq!(0, rt.metrics().num_idle_blocking_threads());
let _ = rt.block_on(rt.spawn_blocking(move || {}));
rt.block_on(async {
time::sleep(Duration::from_millis(5)).await;
});
assert_eq!(1, rt.metrics().num_idle_blocking_threads());
}

#[test]
fn blocking_queue_depth() {
let rt = tokio::runtime::Builder::new_current_thread()
Expand Down

0 comments on commit 7de50a6

Please sign in to comment.