Skip to content

Commit

Permalink
runtime: add custom keep_alive functionality (#2809)
Browse files Browse the repository at this point in the history
Co-authored-by: Eliza Weisman <eliza@buoyant.io>
Fixes: #2585
  • Loading branch information
blasrodri and hawkw committed Sep 7, 2020
1 parent 38ec484 commit f4d6ed0
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
10 changes: 9 additions & 1 deletion tokio/src/runtime/blocking/pool.rs
Expand Up @@ -47,6 +47,9 @@ struct Inner {

// Maximum number of threads
thread_cap: usize,

// Customizable wait timeout
keep_alive: Duration,
}

struct Shared {
Expand Down Expand Up @@ -91,6 +94,10 @@ where
impl BlockingPool {
pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool {
let (shutdown_tx, shutdown_rx) = shutdown::channel();
#[cfg(feature = "blocking")]
let keep_alive = builder.keep_alive.unwrap_or(KEEP_ALIVE);
#[cfg(not(feature = "blocking"))]
let keep_alive = KEEP_ALIVE;

BlockingPool {
spawner: Spawner {
Expand All @@ -110,6 +117,7 @@ impl BlockingPool {
after_start: builder.after_start.clone(),
before_stop: builder.before_stop.clone(),
thread_cap,
keep_alive,
}),
},
shutdown_rx,
Expand Down Expand Up @@ -258,7 +266,7 @@ impl Inner {
shared.num_idle += 1;

while !shared.shutdown {
let lock_result = self.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap();
let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();

shared = lock_result.0;
let timeout_result = lock_result.1;
Expand Down
34 changes: 34 additions & 0 deletions tokio/src/runtime/builder.rs
Expand Up @@ -4,6 +4,8 @@ use crate::runtime::shell::Shell;
use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner};

use std::fmt;
#[cfg(feature = "blocking")]
use std::time::Duration;

/// Builds Tokio Runtime with custom configuration values.
///
Expand Down Expand Up @@ -65,6 +67,11 @@ pub struct Builder {

/// To run before each worker thread stops
pub(super) before_stop: Option<Callback>,

#[cfg(feature = "blocking")]
#[cfg_attr(docsrs, doc(cfg(feature = "blocking")))]
/// Customizable keep alive timeout for BlockingPool
pub(super) keep_alive: Option<Duration>,
}

pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
Expand Down Expand Up @@ -108,6 +115,9 @@ impl Builder {
// No worker thread callbacks
after_start: None,
before_stop: None,

#[cfg(feature = "blocking")]
keep_alive: None,
}
}

Expand Down Expand Up @@ -375,6 +385,30 @@ impl Builder {
blocking_pool,
})
}

#[cfg(feature = "blocking")]
#[cfg_attr(docsrs, doc(cfg(feature = "blocking")))]
/// Sets a custom timeout for a thread in the blocking pool.
///
/// By default, the timeout for a thread is set to 10 seconds. This can
/// be overriden using .thread_keep_alive().
///
/// # Example
///
/// ```
/// # use tokio::runtime;
/// # use std::time::Duration;
///
/// # pub fn main() {
/// let rt = runtime::Builder::new()
/// .thread_keep_alive(Duration::from_millis(100))
/// .build();
/// # }
/// ```
pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
self.keep_alive = Some(duration);
self
}
}

cfg_io_driver! {
Expand Down

0 comments on commit f4d6ed0

Please sign in to comment.