Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rt): read default worker thread from env #4250

Merged
merged 17 commits into from Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 9 additions & 6 deletions tokio/src/lib.rs
Expand Up @@ -174,12 +174,15 @@
//! swapping the currently running task on each thread. However, this kind of
//! swapping can only happen at `.await` points, so code that spends a long time
//! without reaching an `.await` will prevent other tasks from running. To
//! combat this, Tokio provides two kinds of threads: Core threads and blocking
//! threads. The core threads are where all asynchronous code runs, and Tokio
//! will by default spawn one for each CPU core. The blocking threads are
//! spawned on demand, can be used to run blocking code that would otherwise
//! block other tasks from running and are kept alive when not used for a certain
//! amount of time which can be configured with [`thread_keep_alive`].
//! combat this, Tokio provides two kinds of threads: Core threads and blocking threads.
//!
//! The core threads are where all asynchronous code runs, and Tokio will by default
//! spawn one for each CPU core. You can use the environment variable `TOKIO_WORKER_THREADS`
//! to override the default value.
//!
//! The blocking threads are spawned on demand, can be used to run blocking code
//! that would otherwise block other tasks from running and are kept alive when
//! not used for a certain amount of time which can be configured with [`thread_keep_alive`].
//! Since it is not possible for Tokio to swap out blocking tasks, like it
//! can do with asynchronous code, the upper limit on the number of blocking
//! threads is very large. These limits can be configured on the [`Builder`].
Expand Down
37 changes: 36 additions & 1 deletion tokio/src/runtime/builder.rs
Expand Up @@ -6,6 +6,10 @@ use std::fmt;
use std::io;
use std::time::Duration;

/// This key is used to specify the default worker threads for multi-thread runtime.
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
const ENV_WORKER_THREADS: &str = "TOKIO_WORKER_THREADS";

/// Builds Tokio Runtime with custom configuration values.
///
/// Methods can be chained in order to set the configuration values. The
Expand Down Expand Up @@ -182,6 +186,7 @@ cfg_unstable! {

pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;

#[derive(Clone, Copy)]
pub(crate) enum Kind {
CurrentThread,
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Expand Down Expand Up @@ -237,8 +242,9 @@ impl Builder {
// The clock starts not-paused
start_paused: false,

// Read from environment variable first in multi-threaded mode.
// Default to lazy auto-detection (one thread per CPU core)
worker_threads: None,
worker_threads: Self::default_worker_threads(kind),

max_blocking_threads: 512,

Expand Down Expand Up @@ -304,6 +310,8 @@ impl Builder {
/// This can be any number above 0 though it is advised to keep this value
/// on the smaller side.
///
/// This will override the value read from environment variable "TOKIO_WORKER_THREADS".
///
/// # Default
///
/// The default value is the number of cores available to the system.
Expand Down Expand Up @@ -918,6 +926,33 @@ impl Builder {
blocking_pool,
))
}

#[cfg(any(not(feature = "rt-multi-thread"), tokio_wasi))]
fn default_worker_threads(_: Kind) -> Option<usize> {
None
}

#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
fn default_worker_threads(kind: Kind) -> Option<usize> {
match kind {
// Always return None if using current thread
Kind::CurrentThread => return None,
Kind::MultiThread => {}
};
match std::env::var(ENV_WORKER_THREADS) {
Ok(s) => {
let n: usize = s.parse().unwrap_or_else(|e| {
panic!(
"{} must be usize, error: {}, value: {}",
ENV_WORKER_THREADS, e, s
)
PureWhiteWu marked this conversation as resolved.
Show resolved Hide resolved
});
assert!(n > 0, "{} cannot be set to 0", ENV_WORKER_THREADS);
Some(n)
}
Err(_) => None,
PureWhiteWu marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

cfg_io_driver! {
Expand Down