Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: add custom keep_alive functionality #2809

Merged
7 changes: 6 additions & 1 deletion tokio/src/runtime/blocking/pool.rs
Expand Up @@ -47,6 +47,9 @@ struct Inner {

// Maximum number of threads
thread_cap: usize,

// Customizable wait timeout
keep_alive: Duration,
}

struct Shared {
Expand Down Expand Up @@ -91,6 +94,7 @@ where
impl BlockingPool {
pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool {
let (shutdown_tx, shutdown_rx) = shutdown::channel();
let keep_alive = builder.keep_alive.unwrap_or(KEEP_ALIVE);

BlockingPool {
spawner: Spawner {
Expand All @@ -110,6 +114,7 @@ impl BlockingPool {
after_start: builder.after_start.clone(),
before_stop: builder.before_stop.clone(),
thread_cap,
keep_alive,
}),
},
shutdown_rx,
Expand Down Expand Up @@ -258,7 +263,7 @@ impl Inner {
shared.num_idle += 1;

while !shared.shutdown {
let lock_result = self.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap();
let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();

shared = lock_result.0;
let timeout_result = lock_result.1;
Expand Down
30 changes: 30 additions & 0 deletions tokio/src/runtime/builder.rs
Expand Up @@ -4,6 +4,7 @@ use crate::runtime::shell::Shell;
use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner};

use std::fmt;
use std::time::Duration;

/// Builds Tokio Runtime with custom configuration values.
///
Expand Down Expand Up @@ -65,6 +66,9 @@ pub struct Builder {

/// To run before each worker thread stops
pub(super) before_stop: Option<Callback>,

/// Customizable keep alive timeout for BlockingPool
pub(super) keep_alive: Option<Duration>,
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
}

pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
Expand Down Expand Up @@ -108,6 +112,8 @@ impl Builder {
// No worker thread callbacks
after_start: None,
before_stop: None,

keep_alive: None,
}
}

Expand Down Expand Up @@ -375,6 +381,30 @@ impl Builder {
blocking_pool,
})
}

#[cfg(feature = "blocking")]
#[cfg_attr(docsrs, doc(cfg(feature = "blocking")))]
/// Sets a custom timeout for a thread in the blocking pool.
///
/// By default, the timeout for a thread is set to 10 seconds. This can
/// be overriden using .blocking_keep_alive().
///
/// # Example
///
/// ```
/// # use tokio::runtime;
/// # use std::time::Duration;
///
/// # pub fn main() {
/// let rt = runtime::Builder::new()
/// .blocking_keep_alive(Duration::from_millis(100))
/// .build();
/// # }
/// ```
pub fn blocking_keep_alive(&mut self, duration: Duration) -> &mut Self {
blasrodri marked this conversation as resolved.
Show resolved Hide resolved
self.keep_alive = Some(duration);
self
}
}

cfg_io_driver! {
Expand Down