Skip to content

Commit

Permalink
Use global threadpool unless we exceed WORKER_SPAWN_MAX_COUNT. This…
Browse files Browse the repository at this point in the history
… is expected to play more nicely

with other uses of rayon in the same process (avoiding excess parallelism).

Signed-off-by: Daira Hopwood <daira@jacaranda.org>
  • Loading branch information
daira committed Aug 15, 2021
1 parent 6916c3b commit 1815042
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 27 deletions.
9 changes: 5 additions & 4 deletions CHANGELOG.md
Expand Up @@ -7,10 +7,11 @@ and this project adheres to Rust's notion of

## [Unreleased]
### Added
- `BELLMAN_NUM_CPUS` environment variable, which can be used to control the
number of logical CPUs that `bellman` will use when the (default) `multicore`
feature flag is enabled. The default (which has not changed) is to use the
`num_cpus` crate to determine the number of logical CPUs.
- `bellman` now uses `rayon` for multithreading when the (default) `multicore`
feature flag is enabled. This means that, when this flag is enabled, the
`RAYON_NUM_THREADS` environment variable controls the number of threads that
`bellman` will use. The default, which has not changed, is to use the same
number of threads as logical CPUs.
- `bellman::multicore::Waiter`

### Changed
Expand Down
45 changes: 22 additions & 23 deletions src/multicore.rs
Expand Up @@ -4,30 +4,25 @@

#[cfg(feature = "multicore")]
mod implementation {
use std::env;
use std::sync::atomic::{AtomicUsize, Ordering};

use crossbeam_channel::{bounded, Receiver};
use lazy_static::lazy_static;
use log::{error, trace};
use num_cpus;
use rayon::current_num_threads;

static WORKER_SPAWN_COUNTER: AtomicUsize = AtomicUsize::new(0);

lazy_static! {
static ref NUM_CPUS: usize = env::var("BELLMAN_NUM_CPUS")
.map_err(|_| ())
.and_then(|num| num.parse().map_err(|_| ()))
.unwrap_or_else(|_| num_cpus::get());
// See Worker::compute below for a description of this.
static ref WORKER_SPAWN_MAX_COUNT: usize = *NUM_CPUS * 4;
pub static ref THREAD_POOL: rayon::ThreadPool = rayon::ThreadPoolBuilder::new()
.num_threads(*NUM_CPUS)
static ref WORKER_SPAWN_MAX_COUNT: usize = current_num_threads() * 4;
static ref OVERFLOW_THREAD_POOL: rayon::ThreadPool = rayon::ThreadPoolBuilder::new()
.num_threads(current_num_threads())
.build()
.unwrap();
}

#[derive(Clone)]
#[derive(Clone, Default)]
pub struct Worker {}

impl Worker {
Expand All @@ -36,7 +31,7 @@ mod implementation {
}

pub fn log_num_cpus(&self) -> u32 {
log2_floor(*NUM_CPUS)
log2_floor(current_num_threads())
}

pub fn compute<F, R>(&self, f: F) -> Waiter<R>
Expand All @@ -45,7 +40,6 @@ mod implementation {
R: Send + 'static,
{
let (sender, receiver) = bounded(1);
let thread_index = THREAD_POOL.current_thread_index().unwrap_or(0);

// We keep track here of how many times spawn has been called.
// It can be called without limit, each time, putting a
Expand All @@ -63,17 +57,20 @@ mod implementation {
// install call to help clear the growing work queue and
// minimize the chances of memory exhaustion.
if previous_count > *WORKER_SPAWN_MAX_COUNT {
THREAD_POOL.install(move || {
trace!("[{}] switching to install to help clear backlog[current threads {}, threads requested {}]",
thread_index,
THREAD_POOL.current_num_threads(),
WORKER_SPAWN_COUNTER.load(Ordering::SeqCst));
let thread_index = rayon::current_thread_index().unwrap_or(0);
OVERFLOW_THREAD_POOL.install(move || {
trace!("[{}, {}] switching to install to help clear backlog [threads: current {}, overflow {}, requested {}]",
thread_index,
OVERFLOW_THREAD_POOL.current_thread_index().unwrap_or(0),
current_num_threads(),
OVERFLOW_THREAD_POOL.current_num_threads(),
WORKER_SPAWN_COUNTER.load(Ordering::SeqCst));
let res = f();
sender.send(res).unwrap();
WORKER_SPAWN_COUNTER.fetch_sub(1, Ordering::SeqCst);
});
} else {
THREAD_POOL.spawn(move || {
rayon::spawn(move || {
let res = f();
sender.send(res).unwrap();
WORKER_SPAWN_COUNTER.fetch_sub(1, Ordering::SeqCst);
Expand All @@ -88,13 +85,14 @@ mod implementation {
F: FnOnce(&rayon::Scope<'a>, usize) -> R + Send,
R: Send,
{
let chunk_size = if elements < *NUM_CPUS {
let num_threads = current_num_threads();
let chunk_size = if elements < num_threads {
1
} else {
elements / *NUM_CPUS
elements / num_threads
};

THREAD_POOL.scope(|scope| f(scope, chunk_size))
rayon::scope(|scope| f(scope, chunk_size))
}
}

Expand All @@ -105,8 +103,9 @@ mod implementation {
impl<T> Waiter<T> {
/// Wait for the result.
pub fn wait(&self) -> T {
if THREAD_POOL.current_thread_index().is_some() {
let msg = "wait() cannot be called from within the worker thread pool since that would lead to deadlocks";
// This will be Some if this thread is in either the global or overflow thread pool.
if rayon::current_thread_index().is_some() {
let msg = "wait() cannot be called from within a thread pool since that would lead to deadlocks";
// panic! doesn't necessarily kill the process, so we log as well.
error!("{}", msg);
panic!("{}", msg);
Expand Down

0 comments on commit 1815042

Please sign in to comment.