From d90bb2148f933897b18fe4b9e977429a9054f47e Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Mon, 25 Mar 2019 16:36:27 +0100 Subject: [PATCH 01/23] feat: allow controll of the maximum number of cpus using BELLMAN_NUM_CPUS (cherry picked from commit 37686f82496cc388f32b082527d6d1b1ccc3e7de) --- src/multicore.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/multicore.rs b/src/multicore.rs index ba69b5f33..dd29e5559 100644 --- a/src/multicore.rs +++ b/src/multicore.rs @@ -7,6 +7,8 @@ #[cfg(feature = "multicore")] mod implementation { + use std::env; + use crossbeam::{self, thread::Scope}; use futures::{Future, IntoFuture, Poll}; use futures_cpupool::{CpuFuture, CpuPool}; @@ -30,7 +32,17 @@ mod implementation { } pub fn new() -> Worker { - Self::new_with_cpus(num_cpus::get()) + let cpus = if let Ok(num) = env::var("BELLMAN_NUM_CPUS") { + if let Ok(num) = num.parse() { + num + } else { + num_cpus::get() + } + } else { + num_cpus::get() + }; + + Self::new_with_cpus(cpus) } pub fn log_num_cpus(&self) -> u32 { From 2b7a58bc3756e0755a7c680d4631d76edb17d07c Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Mon, 8 Jun 2020 13:28:57 +0200 Subject: [PATCH 02/23] feat(groth16): control parallel proving threads Uses the existing env variable to restrict the number of threads used for parallel proving. Cleaned up version of #81 (cherry picked from commit 0c1868d540978f8d4cc25822070bcdce0e16bcea) pick: - Only the changes to bellman::multicore. - Added lazy_static and rayon dependencies. --- Cargo.toml | 5 ++++- src/multicore.rs | 29 ++++++++++++++++++----------- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c4a1e662e..47f49062f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,9 @@ rand_core = "0.6" byteorder = "1" subtle = "2.2.1" +lazy_static = { version = "1.4.0", optional = true } +rayon = { version = "1.3.0", optional = true } + [dev-dependencies] bls12_381 = "0.5" criterion = "0.3" @@ -36,7 +39,7 @@ sha2 = "0.9" [features] groth16 = ["pairing"] -multicore = ["futures-cpupool", "crossbeam", "num_cpus"] +multicore = ["futures-cpupool", "crossbeam", "lazy_static", "num_cpus", "rayon"] default = ["groth16", "multicore"] [[test]] diff --git a/src/multicore.rs b/src/multicore.rs index dd29e5559..e76174a55 100644 --- a/src/multicore.rs +++ b/src/multicore.rs @@ -12,8 +12,25 @@ mod implementation { use crossbeam::{self, thread::Scope}; use futures::{Future, IntoFuture, Poll}; use futures_cpupool::{CpuFuture, CpuPool}; + use lazy_static::lazy_static; use num_cpus; + lazy_static! { + static ref NUM_CPUS: usize = if let Ok(num) = env::var("BELLMAN_NUM_CPUS") { + if let Ok(num) = num.parse() { + num + } else { + num_cpus::get() + } + } else { + num_cpus::get() + }; + pub static ref THREAD_POOL: rayon::ThreadPool = rayon::ThreadPoolBuilder::new() + .num_threads(*NUM_CPUS) + .build() + .unwrap(); + } + #[derive(Clone)] pub struct Worker { cpus: usize, @@ -32,17 +49,7 @@ mod implementation { } pub fn new() -> Worker { - let cpus = if let Ok(num) = env::var("BELLMAN_NUM_CPUS") { - if let Ok(num) = num.parse() { - num - } else { - num_cpus::get() - } - } else { - num_cpus::get() - }; - - Self::new_with_cpus(cpus) + Self::new_with_cpus(*NUM_CPUS) } pub fn log_num_cpus(&self) -> u32 { From 4970b8fb033c0698f5530071f2eba84de48d5ff8 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Tue, 29 Sep 2020 18:22:52 +0200 Subject: [PATCH 03/23] fix: only spawn threads into a pool, to avoid unlimited thread spawns (cherry picked from commit dca4c5aeb5d34f60e82e203cdefaefeb617f71f8) pick: Only the changes to bellman::multicore. --- Cargo.toml | 3 +-- src/multicore.rs | 36 +++++++++++------------------------- 2 files changed, 12 insertions(+), 27 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 47f49062f..dade427fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,6 @@ futures = "0.1" futures-cpupool = { version = "0.1", optional = true } group = "0.10" num_cpus = { version = "1", optional = true } -crossbeam = { version = "0.7", optional = true } pairing = { version = "0.20", optional = true } rand_core = "0.6" byteorder = "1" @@ -39,7 +38,7 @@ sha2 = "0.9" [features] groth16 = ["pairing"] -multicore = ["futures-cpupool", "crossbeam", "lazy_static", "num_cpus", "rayon"] +multicore = ["futures-cpupool", "lazy_static", "num_cpus", "rayon"] default = ["groth16", "multicore"] [[test]] diff --git a/src/multicore.rs b/src/multicore.rs index e76174a55..b22815e9e 100644 --- a/src/multicore.rs +++ b/src/multicore.rs @@ -1,6 +1,6 @@ //! An interface for dealing with the kinds of parallel computations involved in //! `bellman`. It's currently just a thin wrapper around [`CpuPool`] and -//! [`crossbeam`] but may be extended in the future to allow for various +//! [`rayon`] but may be extended in the future to allow for various //! parallelism strategies. //! //! [`CpuPool`]: futures_cpupool::CpuPool @@ -9,7 +9,6 @@ mod implementation { use std::env; - use crossbeam::{self, thread::Scope}; use futures::{Future, IntoFuture, Poll}; use futures_cpupool::{CpuFuture, CpuPool}; use lazy_static::lazy_static; @@ -29,31 +28,19 @@ mod implementation { .num_threads(*NUM_CPUS) .build() .unwrap(); + static ref CPU_POOL: CpuPool = CpuPool::new(*NUM_CPUS); } #[derive(Clone)] - pub struct Worker { - cpus: usize, - pool: CpuPool, - } + pub struct Worker {} impl Worker { - // We don't expose this outside the library so that - // all `Worker` instances have the same number of - // CPUs configured. - pub(crate) fn new_with_cpus(cpus: usize) -> Worker { - Worker { - cpus, - pool: CpuPool::new(cpus), - } - } - pub fn new() -> Worker { - Self::new_with_cpus(*NUM_CPUS) + Worker {} } pub fn log_num_cpus(&self) -> u32 { - log2_floor(self.cpus) + log2_floor(*NUM_CPUS) } pub fn compute(&self, f: F) -> WorkerFuture @@ -65,23 +52,22 @@ mod implementation { R::Error: Send + 'static, { WorkerFuture { - future: self.pool.spawn_fn(f), + future: CPU_POOL.spawn_fn(f), } } pub fn scope<'a, F, R>(&self, elements: usize, f: F) -> R where - F: FnOnce(&Scope<'a>, usize) -> R, + F: FnOnce(&rayon::Scope<'a>, usize) -> R + Send, + R: Send, { - let chunk_size = if elements < self.cpus { + let chunk_size = if elements < *NUM_CPUS { 1 } else { - elements / self.cpus + elements / *NUM_CPUS }; - // TODO: Handle case where threads fail - crossbeam::scope(|scope| f(scope, chunk_size)) - .expect("Threads aren't allowed to fail yet") + THREAD_POOL.scope(|scope| f(scope, chunk_size)) } } From b4276a3d6265443d80366088a6173307141dba2f Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Sat, 3 Oct 2020 00:11:29 +0200 Subject: [PATCH 04/23] feat: limited thread spawning and unroll recursion in multiexp (cherry picked from commit c55d5e4157279f63aeb3e379cdc684c1ec22a25e) Changes during pick: - Excluded GPU-related changes (not upstreamed yet). - Added fix for change to Field::double API. - Fixed no-multicore Worker impl. Co-authored-by: Jack Grigg --- Cargo.toml | 5 +- benches/slow.rs | 1 - src/groth16/prover.rs | 2 - src/multicore.rs | 84 ++++++++++++-------- src/multiexp.rs | 175 ++++++++++++++++++++---------------------- 5 files changed, 135 insertions(+), 132 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index dade427fc..fa6bdda7b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,8 +16,6 @@ edition = "2018" bitvec = "0.22" blake2s_simd = "0.5" ff = "0.10" -futures = "0.1" -futures-cpupool = { version = "0.1", optional = true } group = "0.10" num_cpus = { version = "1", optional = true } pairing = { version = "0.20", optional = true } @@ -25,6 +23,7 @@ rand_core = "0.6" byteorder = "1" subtle = "2.2.1" +crossbeam-channel = { version = "0.4.4", optional = true } lazy_static = { version = "1.4.0", optional = true } rayon = { version = "1.3.0", optional = true } @@ -38,7 +37,7 @@ sha2 = "0.9" [features] groth16 = ["pairing"] -multicore = ["futures-cpupool", "lazy_static", "num_cpus", "rayon"] +multicore = ["crossbeam-channel", "lazy_static", "num_cpus", "rayon"] default = ["groth16", "multicore"] [[test]] diff --git a/benches/slow.rs b/benches/slow.rs index 542f45c21..b36099ecb 100644 --- a/benches/slow.rs +++ b/benches/slow.rs @@ -5,7 +5,6 @@ use bellman::{ use bls12_381::{Bls12, Scalar}; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use ff::{Field, PrimeFieldBits}; -use futures::Future; use group::{Curve, Group}; use pairing::Engine; use rand_core::SeedableRng; diff --git a/src/groth16/prover.rs b/src/groth16/prover.rs index 20f348756..197e5dcbe 100644 --- a/src/groth16/prover.rs +++ b/src/groth16/prover.rs @@ -2,8 +2,6 @@ use rand_core::RngCore; use std::ops::{AddAssign, MulAssign}; use std::sync::Arc; -use futures::Future; - use ff::{Field, PrimeField, PrimeFieldBits}; use group::{prime::PrimeCurveAffine, Curve}; use pairing::Engine; diff --git a/src/multicore.rs b/src/multicore.rs index b22815e9e..92367cb53 100644 --- a/src/multicore.rs +++ b/src/multicore.rs @@ -9,8 +9,7 @@ mod implementation { use std::env; - use futures::{Future, IntoFuture, Poll}; - use futures_cpupool::{CpuFuture, CpuPool}; + use crossbeam_channel::{bounded, Receiver}; use lazy_static::lazy_static; use num_cpus; @@ -28,7 +27,6 @@ mod implementation { .num_threads(*NUM_CPUS) .build() .unwrap(); - static ref CPU_POOL: CpuPool = CpuPool::new(*NUM_CPUS); } #[derive(Clone)] @@ -43,17 +41,18 @@ mod implementation { log2_floor(*NUM_CPUS) } - pub fn compute(&self, f: F) -> WorkerFuture + pub fn compute(&self, f: F) -> Waiter where F: FnOnce() -> R + Send + 'static, - R: IntoFuture + 'static, - R::Future: Send + 'static, - R::Item: Send + 'static, - R::Error: Send + 'static, + R: Send + 'static, { - WorkerFuture { - future: CPU_POOL.spawn_fn(f), - } + let (sender, receiver) = bounded(1); + THREAD_POOL.spawn(move || { + let res = f(); + sender.send(res).unwrap(); + }); + + Waiter { receiver } } pub fn scope<'a, F, R>(&self, elements: usize, f: F) -> R @@ -71,16 +70,22 @@ mod implementation { } } - pub struct WorkerFuture { - future: CpuFuture, + pub struct Waiter { + receiver: Receiver, } - impl Future for WorkerFuture { - type Item = T; - type Error = E; + impl Waiter { + /// Wait for the result. + pub fn wait(&self) -> T { + self.receiver.recv().unwrap() + } + + /// One off sending. + pub fn done(val: T) -> Self { + let (sender, receiver) = bounded(1); + sender.send(val).unwrap(); - fn poll(&mut self) -> Poll { - self.future.poll() + Waiter { receiver } } } @@ -111,8 +116,6 @@ mod implementation { #[cfg(not(feature = "multicore"))] mod implementation { - use futures::{future, Future, IntoFuture, Poll}; - #[derive(Clone)] pub struct Worker; @@ -125,15 +128,12 @@ mod implementation { 0 } - pub fn compute(&self, f: F) -> R::Future + pub fn compute(&self, f: F) -> Waiter where F: FnOnce() -> R + Send + 'static, - R: IntoFuture + 'static, - R::Future: Send + 'static, - R::Item: Send + 'static, - R::Error: Send + 'static, + R: Send + 'static, { - f().into_future() + Waiter::done(f()) } pub fn scope(&self, elements: usize, f: F) -> R @@ -144,16 +144,19 @@ mod implementation { } } - pub struct WorkerFuture { - future: future::FutureResult, + pub struct Waiter { + val: Option, } - impl Future for WorkerFuture { - type Item = T; - type Error = E; + impl Waiter { + /// Wait for the result. + pub fn wait(&mut self) -> T { + self.val.take().unwrap() + } - fn poll(&mut self) -> Poll { - self.future.poll() + /// One off sending. + pub fn done(val: T) -> Self { + Waiter { val: Some(val) } } } @@ -164,6 +167,21 @@ mod implementation { f(self); } } + + /// A fake rayon ParallelIterator that is just a serial iterator. + pub(crate) trait FakeParallelIterator { + type Iter: Iterator; + type Item: Send; + fn into_par_iter(self) -> Self::Iter; + } + + impl FakeParallelIterator for core::ops::Range { + type Iter = Self; + type Item = u32; + fn into_par_iter(self) -> Self::Iter { + self + } + } } pub use self::implementation::*; diff --git a/src/multiexp.rs b/src/multiexp.rs index 4aa235882..3089e7c44 100644 --- a/src/multiexp.rs +++ b/src/multiexp.rs @@ -1,13 +1,18 @@ -use super::multicore::Worker; +use super::multicore::{Waiter, Worker}; use bitvec::vec::BitVec; use ff::{FieldBits, PrimeField, PrimeFieldBits}; -use futures::Future; use group::prime::{PrimeCurve, PrimeCurveAffine}; use std::io; use std::iter; use std::ops::AddAssign; use std::sync::Arc; +#[cfg(feature = "multicore")] +use rayon::prelude::*; + +#[cfg(not(feature = "multicore"))] +use crate::multicore::FakeParallelIterator; + use super::SynthesisError; /// An object that builds a source of bases. @@ -146,14 +151,11 @@ impl DensityTracker { } fn multiexp_inner( - pool: &Worker, bases: S, density_map: D, exponents: Arc::ReprBits>>>, - mut skip: u32, c: u32, - handle_trivial: bool, -) -> Box> +) -> Result where for<'a> &'a Q: QueryDensity, D: Send + Sync + 'static + Clone + AsRef, @@ -162,100 +164,83 @@ where S: SourceBuilder<::Affine>, { // Perform this region of the multiexp - let this = { - let bases = bases.clone(); - let exponents = exponents.clone(); - let density_map = density_map.clone(); - - pool.compute(move || { - // Accumulate the result - let mut acc = G::identity(); - - // Build a source for the bases - let mut bases = bases.new(); - - // Create space for the buckets - let mut buckets = vec![G::identity(); (1 << c) - 1]; - - // Sort the bases into buckets - for (exp, density) in exponents.iter().zip(density_map.as_ref().iter()) { - if density { - let (exp_is_zero, exp_is_one) = { - let (first, rest) = exp.split_first().unwrap(); - let rest_unset = rest.not_any(); - (!*first && rest_unset, *first && rest_unset) - }; - - if exp_is_zero { + let this = move |bases: S, + density_map: D, + exponents: Arc::ReprBits>>>, + skip: u32| + -> Result<_, SynthesisError> { + // Accumulate the result + let mut acc = G::identity(); + + // Build a source for the bases + let mut bases = bases.new(); + + // Create space for the buckets + let mut buckets = vec![G::identity(); (1 << c) - 1]; + + // only the first round uses this + let handle_trivial = skip == 0; + + // Sort the bases into buckets + for (exp, density) in exponents.iter().zip(density_map.as_ref().iter()) { + if density { + let (exp_is_zero, exp_is_one) = { + let (first, rest) = exp.split_first().unwrap(); + let rest_unset = rest.not_any(); + (!*first && rest_unset, *first && rest_unset) + }; + + if exp_is_zero { + bases.skip(1)?; + } else if exp_is_one { + if handle_trivial { + acc.add_assign_from_source(&mut bases)?; + } else { bases.skip(1)?; - } else if exp_is_one { - if handle_trivial { - acc.add_assign_from_source(&mut bases)?; - } else { - bases.skip(1)?; - } + } + } else { + let exp = exp + .into_iter() + .by_val() + .skip(skip as usize) + .take(c as usize) + .enumerate() + .fold(0u64, |acc, (i, b)| acc + ((b as u64) << i)); + + if exp != 0 { + (&mut buckets[(exp - 1) as usize]).add_assign_from_source(&mut bases)?; } else { - let exp = exp - .into_iter() - .by_val() - .skip(skip as usize) - .take(c as usize) - .enumerate() - .fold(0u64, |acc, (i, b)| acc + ((b as u64) << i)); - - if exp != 0 { - (&mut buckets[(exp - 1) as usize]) - .add_assign_from_source(&mut bases)?; - } else { - bases.skip(1)?; - } + bases.skip(1)?; } } } + } - // Summation by parts - // e.g. 3a + 2b + 1c = a + - // (a) + b + - // ((a) + b) + c - let mut running_sum = G::identity(); - for exp in buckets.into_iter().rev() { - running_sum.add_assign(&exp); - acc.add_assign(&running_sum); - } + // Summation by parts + // e.g. 3a + 2b + 1c = a + + // (a) + b + + // ((a) + b) + c + let mut running_sum = G::identity(); + for exp in buckets.into_iter().rev() { + running_sum.add_assign(&exp); + acc.add_assign(&running_sum); + } - Ok(acc) - }) + Ok(acc) }; - skip += c; - - if skip >= G::Scalar::NUM_BITS { - // There isn't another region. - Box::new(this) - } else { - // There's another region more significant. Calculate and join it with - // this region recursively. - Box::new( - this.join(multiexp_inner( - pool, - bases, - density_map, - exponents, - skip, - c, - false, - )) - .map(move |(this, mut higher): (_, G)| { - for _ in 0..c { - higher = higher.double(); - } - - higher.add_assign(&this); - - higher - }), - ) - } + let parts = (0..G::Scalar::NUM_BITS) + .into_par_iter() + .step_by(c as usize) + .map(|skip| this(bases.clone(), density_map.clone(), exponents.clone(), skip)) + .collect::>>(); + + parts + .into_iter() + .rev() + .try_fold(G::identity(), |acc, part| { + part.map(|part| (0..c).fold(acc, |acc, _| acc.double()) + part) + }) } /// Perform multi-exponentiation. The caller is responsible for ensuring the @@ -265,7 +250,7 @@ pub fn multiexp( bases: S, density_map: D, exponents: Arc::ReprBits>>>, -) -> Box> +) -> Waiter> where for<'a> &'a Q: QueryDensity, D: Send + Sync + 'static + Clone + AsRef, @@ -286,7 +271,7 @@ where assert!(query_size == exponents.len()); } - multiexp_inner(pool, bases, density_map, exponents, 0, c, true) + pool.compute(move || multiexp_inner(bases, density_map, exponents, c)) } #[cfg(feature = "pairing")] @@ -328,11 +313,15 @@ fn test_with_bls12() { .collect::>(), ); + let now = std::time::Instant::now(); let naive: ::G1 = naive_multiexp(g.clone(), v.clone()); + println!("Naive: {}", now.elapsed().as_millis()); + let now = std::time::Instant::now(); let pool = Worker::new(); let fast = multiexp(&pool, (g, 0), FullDensity, v_bits).wait().unwrap(); + println!("Fast: {}", now.elapsed().as_millis()); assert_eq!(naive, fast); } From 4d857805bf7710a7c438cc386dd59368e7f720a1 Mon Sep 17 00:00:00 2001 From: Volker Mische Date: Thu, 11 Feb 2021 14:50:54 +0100 Subject: [PATCH 05/23] chore: log error if waiter is used within a thread pool If a waiter is used from within a thread pool it might deadlock. Don't do that. (cherry picked from commit dc136066a939902c39a3b6abc5c6e67883db66b1) --- Cargo.toml | 3 ++- src/multicore.rs | 6 ++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index fa6bdda7b..edc70aca9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ subtle = "2.2.1" crossbeam-channel = { version = "0.4.4", optional = true } lazy_static = { version = "1.4.0", optional = true } +log = { version = "0.4.8", optional = true } rayon = { version = "1.3.0", optional = true } [dev-dependencies] @@ -37,7 +38,7 @@ sha2 = "0.9" [features] groth16 = ["pairing"] -multicore = ["crossbeam-channel", "lazy_static", "num_cpus", "rayon"] +multicore = ["crossbeam-channel", "lazy_static", "log", "num_cpus", "rayon"] default = ["groth16", "multicore"] [[test]] diff --git a/src/multicore.rs b/src/multicore.rs index 92367cb53..f48cc520f 100644 --- a/src/multicore.rs +++ b/src/multicore.rs @@ -11,6 +11,7 @@ mod implementation { use crossbeam_channel::{bounded, Receiver}; use lazy_static::lazy_static; + use log::error; use num_cpus; lazy_static! { @@ -77,6 +78,11 @@ mod implementation { impl Waiter { /// Wait for the result. pub fn wait(&self) -> T { + if THREAD_POOL.current_thread_index().is_some() { + // Calling `wait()` from within the worker thread pool can lead to dead logs + error!("The wait call should never be done inside the worker thread pool"); + debug_assert!(false); + } self.receiver.recv().unwrap() } From 7d7c453c20bd96daf3372abe7cc535f10c7bdd19 Mon Sep 17 00:00:00 2001 From: Volker Mische Date: Fri, 12 Feb 2021 13:25:02 +0100 Subject: [PATCH 06/23] feat: fix bug in parallelism inherent in the Worker/Waiter model The Worker/Waiter model creates a Worker that requests that threads be spawned to complete some work. Since there is no bound to the number of requests for work, it's possible to exhaust memory with pending closures waiting to be processed. This was not an issue when 'everything was bounded by a single thread pool', however that setup has race conditions built into it. We have recently spent time splitting that model out, and this should be the final step required. This code places a limit on the number of requests that can be queued in an attempt to lessen the chance of memory exhaustion due to pending requests. It does this by placing a limit that is a simple multiple of the number of cores available (and in testing is far below observed crash criteria). When the number of requests for spawning new threads reaches that limit, instead of adding a new request, we block on execution to help clear the backlog. (cherry picked from commit 74947c014da198bb9060adf83d2d030c031246af) pick: Also includes log change from b481a3432f4acf126a8fb8f42dd3eda90b525948 --- src/multicore.rs | 56 +++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 51 insertions(+), 5 deletions(-) diff --git a/src/multicore.rs b/src/multicore.rs index f48cc520f..0e7cb7f72 100644 --- a/src/multicore.rs +++ b/src/multicore.rs @@ -8,12 +8,15 @@ #[cfg(feature = "multicore")] mod implementation { use std::env; + use std::sync::atomic::{AtomicUsize, Ordering}; use crossbeam_channel::{bounded, Receiver}; use lazy_static::lazy_static; - use log::error; + use log::{error, trace}; use num_cpus; + static WORKER_SPAWN_COUNTER: AtomicUsize = AtomicUsize::new(0); + lazy_static! { static ref NUM_CPUS: usize = if let Ok(num) = env::var("BELLMAN_NUM_CPUS") { if let Ok(num) = num.parse() { @@ -24,6 +27,8 @@ mod implementation { } else { num_cpus::get() }; + // See Worker::compute below for a description of this. + static ref WORKER_SPAWN_MAX_COUNT: usize = *NUM_CPUS * 4; pub static ref THREAD_POOL: rayon::ThreadPool = rayon::ThreadPoolBuilder::new() .num_threads(*NUM_CPUS) .build() @@ -48,10 +53,51 @@ mod implementation { R: Send + 'static, { let (sender, receiver) = bounded(1); - THREAD_POOL.spawn(move || { - let res = f(); - sender.send(res).unwrap(); - }); + + let thread_index = if THREAD_POOL.current_thread_index().is_some() { + THREAD_POOL.current_thread_index().unwrap() + } else { + 0 + }; + + // We keep track here of how many times spawn has been called. + // It can be called without limit, each time, putting a + // request for a new thread to execute a method on the + // ThreadPool. However, if we allow it to be called without + // limits, we run the risk of memory exhaustion due to limited + // stack space consumed by all of the pending closures to be + // executed. + let previous_count = WORKER_SPAWN_COUNTER.fetch_add(1, Ordering::SeqCst); + + // If the number of spawns requested has exceeded the number + // of cores available for processing by some factor (the + // default being 4), instead of requesting that we spawn a new + // thread, we instead execute the closure in the context of an + // install call to help clear the growing work queue and + // minimize the chances of memory exhaustion. + if previous_count > *WORKER_SPAWN_MAX_COUNT { + THREAD_POOL.install(move || { + trace!("[{}] switching to install to help clear backlog[current threads {}, threads requested {}]", + thread_index, + THREAD_POOL.current_num_threads(), + WORKER_SPAWN_COUNTER.load(Ordering::SeqCst)); + let res = f(); + sender.send(res).unwrap(); + WORKER_SPAWN_COUNTER.fetch_sub(1, Ordering::SeqCst); + }); + } else { + THREAD_POOL.spawn(move || { + trace!( + "[{}] pool is using spawn [current threads {}, threads requested {}]", + thread_index, + THREAD_POOL.current_num_threads(), + WORKER_SPAWN_COUNTER.load(Ordering::SeqCst), + ); + let res = f(); + sender.send(res).unwrap(); + WORKER_SPAWN_COUNTER.fetch_sub(1, Ordering::SeqCst); + }); + } Waiter { receiver } } From 103113a7d55c860ef3d59149f3e0419a7954f1b2 Mon Sep 17 00:00:00 2001 From: nemo Date: Mon, 8 Mar 2021 15:17:50 -0500 Subject: [PATCH 07/23] style: remove verbose logging (cherry picked from commit 007487fa7628547642f4e1cab23c4989ae9513ee) --- src/multicore.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/multicore.rs b/src/multicore.rs index 0e7cb7f72..955f00523 100644 --- a/src/multicore.rs +++ b/src/multicore.rs @@ -87,12 +87,6 @@ mod implementation { }); } else { THREAD_POOL.spawn(move || { - trace!( - "[{}] pool is using spawn [current threads {}, threads requested {}]", - thread_index, - THREAD_POOL.current_num_threads(), - WORKER_SPAWN_COUNTER.load(Ordering::SeqCst), - ); let res = f(); sender.send(res).unwrap(); WORKER_SPAWN_COUNTER.fetch_sub(1, Ordering::SeqCst); From 360788dacd8f184f8185e61d4c8b058c3e03d27f Mon Sep 17 00:00:00 2001 From: Jack Grigg Date: Tue, 1 Jun 2021 01:42:49 +0100 Subject: [PATCH 08/23] Bump crossbeam-channel dependence --- Cargo.toml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index edc70aca9..973a7e883 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,15 +17,16 @@ bitvec = "0.22" blake2s_simd = "0.5" ff = "0.10" group = "0.10" -num_cpus = { version = "1", optional = true } pairing = { version = "0.20", optional = true } rand_core = "0.6" byteorder = "1" subtle = "2.2.1" -crossbeam-channel = { version = "0.4.4", optional = true } +# Multicore dependencies +crossbeam-channel = { version = "0.5", optional = true } lazy_static = { version = "1.4.0", optional = true } log = { version = "0.4.8", optional = true } +num_cpus = { version = "1", optional = true } rayon = { version = "1.3.0", optional = true } [dev-dependencies] From 9f05fec2c718d3ad8ebc2e1b4788154d4d4c2802 Mon Sep 17 00:00:00 2001 From: Jack Grigg Date: Tue, 1 Jun 2021 01:48:04 +0100 Subject: [PATCH 09/23] multicore: Simplify NUM_CPUS definition --- src/multicore.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/multicore.rs b/src/multicore.rs index 955f00523..ad4ed12e3 100644 --- a/src/multicore.rs +++ b/src/multicore.rs @@ -18,15 +18,10 @@ mod implementation { static WORKER_SPAWN_COUNTER: AtomicUsize = AtomicUsize::new(0); lazy_static! { - static ref NUM_CPUS: usize = if let Ok(num) = env::var("BELLMAN_NUM_CPUS") { - if let Ok(num) = num.parse() { - num - } else { - num_cpus::get() - } - } else { - num_cpus::get() - }; + static ref NUM_CPUS: usize = env::var("BELLMAN_NUM_CPUS") + .map_err(|_| ()) + .and_then(|num| num.parse().map_err(|_| ())) + .unwrap_or_else(|_| num_cpus::get()); // See Worker::compute below for a description of this. static ref WORKER_SPAWN_MAX_COUNT: usize = *NUM_CPUS * 4; pub static ref THREAD_POOL: rayon::ThreadPool = rayon::ThreadPoolBuilder::new() From d7ec539545bc842bae16c07b4998d3a337b3bbcc Mon Sep 17 00:00:00 2001 From: Jack Grigg Date: Tue, 1 Jun 2021 02:08:30 +0100 Subject: [PATCH 10/23] Update changelog with multicore changes --- CHANGELOG.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 96bc05368..e98985179 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,22 @@ and this project adheres to Rust's notion of [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Added +- `BELLMAN_NUM_CPUS` environment variable, which can be used to control the + number of logical CPUs that `bellman` will use when the (default) `multicore` + feature flag is enabled. +- `bellman::multicore::Waiter` + +### Changed +- `bellman::multicore` has migrated from `crossbeam` to `rayon`: + - `bellman::multicore::Worker::compute` now returns + `bellman::multicore::Waiter`. + - `bellman::multiexp::multiexp` now returns + `bellman::multicore::Waiter>` instead of + `Box>`. + +### Removed +- `bellman::multicore::WorkerFuture` (replaced by `Waiter`). ## [0.10.0] - 2021-06-04 ### Added From 510b908f22b13b3844c2a8cd0675a323f2c64282 Mon Sep 17 00:00:00 2001 From: Jack Grigg Date: Tue, 1 Jun 2021 02:12:52 +0100 Subject: [PATCH 11/23] CI: Build for wasm32-wasi target to ensure no-multicore works --- .github/workflows/ci.yml | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ddb5cbb2c..677d24c22 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,6 +31,29 @@ jobs: command: test args: --verbose --release + build: + name: Build target ${{ matrix.target }} + runs-on: ubuntu-latest + strategy: + matrix: + target: + - wasm32-wasi + + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 + with: + toolchain: 1.51.0 + override: true + - name: Add target + run: rustup target add ${{ matrix.target }} + - name: cargo fetch + uses: actions-rs/cargo@v1 + with: + command: fetch + - name: Build for target + run: cargo build --verbose --no-default-features --target ${{ matrix.target }} + bitrot: name: Bitrot check runs-on: ubuntu-latest From 6e9f575854706139d0f5ed32699db62cc0781746 Mon Sep 17 00:00:00 2001 From: Jack Grigg Date: Tue, 1 Jun 2021 02:32:59 +0100 Subject: [PATCH 12/23] Remove broken intra-doc link --- src/multicore.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/multicore.rs b/src/multicore.rs index ad4ed12e3..19cec73fe 100644 --- a/src/multicore.rs +++ b/src/multicore.rs @@ -1,9 +1,6 @@ //! An interface for dealing with the kinds of parallel computations involved in -//! `bellman`. It's currently just a thin wrapper around [`CpuPool`] and -//! [`rayon`] but may be extended in the future to allow for various -//! parallelism strategies. -//! -//! [`CpuPool`]: futures_cpupool::CpuPool +//! `bellman`. It's currently just a thin wrapper around [`rayon`] but may be +//! extended in the future to allow for various parallelism strategies. #[cfg(feature = "multicore")] mod implementation { From 22caab94657e39f69d76427a105d4ce5f63d71ce Mon Sep 17 00:00:00 2001 From: str4d Date: Tue, 1 Jun 2021 13:11:09 +0100 Subject: [PATCH 13/23] multicore: Unconditionally panic if Waiter::wait is called from threadpool This occurring is a programming error. Co-authored-by: Daira Hopwood --- src/multicore.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/multicore.rs b/src/multicore.rs index 19cec73fe..428b38c41 100644 --- a/src/multicore.rs +++ b/src/multicore.rs @@ -111,9 +111,10 @@ mod implementation { /// Wait for the result. pub fn wait(&self) -> T { if THREAD_POOL.current_thread_index().is_some() { - // Calling `wait()` from within the worker thread pool can lead to dead logs - error!("The wait call should never be done inside the worker thread pool"); - debug_assert!(false); + let msg = "wait() cannot be called from within the worker thread pool since that would lead to deadlocks"; + // panic! doesn't necessarily kill the process, so we log as well. + error!("{}", msg); + panic!("{}", msg); } self.receiver.recv().unwrap() } From a4b4e1443d2c0730c600a2ab5510d02c6b959ec7 Mon Sep 17 00:00:00 2001 From: str4d Date: Tue, 1 Jun 2021 13:48:18 +0100 Subject: [PATCH 14/23] multicore: Code simplification Co-authored-by: Daira Hopwood --- src/multicore.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/multicore.rs b/src/multicore.rs index 428b38c41..149829e11 100644 --- a/src/multicore.rs +++ b/src/multicore.rs @@ -45,12 +45,7 @@ mod implementation { R: Send + 'static, { let (sender, receiver) = bounded(1); - - let thread_index = if THREAD_POOL.current_thread_index().is_some() { - THREAD_POOL.current_thread_index().unwrap() - } else { - 0 - }; + let thread_index = THREAD_POOL.current_thread_index().unwrap_or(0); // We keep track here of how many times spawn has been called. // It can be called without limit, each time, putting a From 7d81964644566803db517cd2afd2c31c4dfa2afa Mon Sep 17 00:00:00 2001 From: str4d Date: Tue, 1 Jun 2021 13:49:32 +0100 Subject: [PATCH 15/23] multicore: Documentation tweaks Co-authored-by: Daira Hopwood --- CHANGELOG.md | 3 ++- src/multicore.rs | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e98985179..c0896ed8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,8 @@ and this project adheres to Rust's notion of ### Added - `BELLMAN_NUM_CPUS` environment variable, which can be used to control the number of logical CPUs that `bellman` will use when the (default) `multicore` - feature flag is enabled. + feature flag is enabled. The default (which has not changed) is to use the + `num_cpus` crate to determine the number of logical CPUs. - `bellman::multicore::Waiter` ### Changed diff --git a/src/multicore.rs b/src/multicore.rs index 149829e11..f7c0430b2 100644 --- a/src/multicore.rs +++ b/src/multicore.rs @@ -114,7 +114,7 @@ mod implementation { self.receiver.recv().unwrap() } - /// One off sending. + /// One-off sending. pub fn done(val: T) -> Self { let (sender, receiver) = bounded(1); sender.send(val).unwrap(); @@ -185,10 +185,10 @@ mod implementation { impl Waiter { /// Wait for the result. pub fn wait(&mut self) -> T { - self.val.take().unwrap() + self.val.take().expect("unmet data dependency") } - /// One off sending. + /// One-off sending. pub fn done(val: T) -> Self { Waiter { val: Some(val) } } From 6916c3b84d9bc5e39ac9149e7e2bf287fa1345dc Mon Sep 17 00:00:00 2001 From: Daira Hopwood Date: Sun, 15 Aug 2021 09:56:54 +0100 Subject: [PATCH 16/23] Update dependencies. Signed-off-by: Daira Hopwood --- Cargo.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 973a7e883..c984cdb1d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,11 +23,11 @@ byteorder = "1" subtle = "2.2.1" # Multicore dependencies -crossbeam-channel = { version = "0.5", optional = true } +crossbeam-channel = { version = "0.5.1", optional = true } lazy_static = { version = "1.4.0", optional = true } -log = { version = "0.4.8", optional = true } -num_cpus = { version = "1", optional = true } -rayon = { version = "1.3.0", optional = true } +log = { version = "0.4.14", optional = true } +num_cpus = { version = "1.13", optional = true } +rayon = { version = "1.5.1", optional = true } [dev-dependencies] bls12_381 = "0.5" From 181504272d0265f9c1be243ba8bed600f8e747bf Mon Sep 17 00:00:00 2001 From: Daira Hopwood Date: Sun, 15 Aug 2021 10:12:46 +0100 Subject: [PATCH 17/23] Use global threadpool unless we exceed `WORKER_SPAWN_MAX_COUNT`. This is expected to play more nicely with other uses of rayon in the same process (avoiding excess parallelism). Signed-off-by: Daira Hopwood --- CHANGELOG.md | 9 +++++---- src/multicore.rs | 45 ++++++++++++++++++++++----------------------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c0896ed8d..a07321207 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,10 +7,11 @@ and this project adheres to Rust's notion of ## [Unreleased] ### Added -- `BELLMAN_NUM_CPUS` environment variable, which can be used to control the - number of logical CPUs that `bellman` will use when the (default) `multicore` - feature flag is enabled. The default (which has not changed) is to use the - `num_cpus` crate to determine the number of logical CPUs. +- `bellman` now uses `rayon` for multithreading when the (default) `multicore` + feature flag is enabled. This means that, when this flag is enabled, the + `RAYON_NUM_THREADS` environment variable controls the number of threads that + `bellman` will use. The default, which has not changed, is to use the same + number of threads as logical CPUs. - `bellman::multicore::Waiter` ### Changed diff --git a/src/multicore.rs b/src/multicore.rs index f7c0430b2..d45ef8ce0 100644 --- a/src/multicore.rs +++ b/src/multicore.rs @@ -4,30 +4,25 @@ #[cfg(feature = "multicore")] mod implementation { - use std::env; use std::sync::atomic::{AtomicUsize, Ordering}; use crossbeam_channel::{bounded, Receiver}; use lazy_static::lazy_static; use log::{error, trace}; - use num_cpus; + use rayon::current_num_threads; static WORKER_SPAWN_COUNTER: AtomicUsize = AtomicUsize::new(0); lazy_static! { - static ref NUM_CPUS: usize = env::var("BELLMAN_NUM_CPUS") - .map_err(|_| ()) - .and_then(|num| num.parse().map_err(|_| ())) - .unwrap_or_else(|_| num_cpus::get()); // See Worker::compute below for a description of this. - static ref WORKER_SPAWN_MAX_COUNT: usize = *NUM_CPUS * 4; - pub static ref THREAD_POOL: rayon::ThreadPool = rayon::ThreadPoolBuilder::new() - .num_threads(*NUM_CPUS) + static ref WORKER_SPAWN_MAX_COUNT: usize = current_num_threads() * 4; + static ref OVERFLOW_THREAD_POOL: rayon::ThreadPool = rayon::ThreadPoolBuilder::new() + .num_threads(current_num_threads()) .build() .unwrap(); } - #[derive(Clone)] + #[derive(Clone, Default)] pub struct Worker {} impl Worker { @@ -36,7 +31,7 @@ mod implementation { } pub fn log_num_cpus(&self) -> u32 { - log2_floor(*NUM_CPUS) + log2_floor(current_num_threads()) } pub fn compute(&self, f: F) -> Waiter @@ -45,7 +40,6 @@ mod implementation { R: Send + 'static, { let (sender, receiver) = bounded(1); - let thread_index = THREAD_POOL.current_thread_index().unwrap_or(0); // We keep track here of how many times spawn has been called. // It can be called without limit, each time, putting a @@ -63,17 +57,20 @@ mod implementation { // install call to help clear the growing work queue and // minimize the chances of memory exhaustion. if previous_count > *WORKER_SPAWN_MAX_COUNT { - THREAD_POOL.install(move || { - trace!("[{}] switching to install to help clear backlog[current threads {}, threads requested {}]", - thread_index, - THREAD_POOL.current_num_threads(), - WORKER_SPAWN_COUNTER.load(Ordering::SeqCst)); + let thread_index = rayon::current_thread_index().unwrap_or(0); + OVERFLOW_THREAD_POOL.install(move || { + trace!("[{}, {}] switching to install to help clear backlog [threads: current {}, overflow {}, requested {}]", + thread_index, + OVERFLOW_THREAD_POOL.current_thread_index().unwrap_or(0), + current_num_threads(), + OVERFLOW_THREAD_POOL.current_num_threads(), + WORKER_SPAWN_COUNTER.load(Ordering::SeqCst)); let res = f(); sender.send(res).unwrap(); WORKER_SPAWN_COUNTER.fetch_sub(1, Ordering::SeqCst); }); } else { - THREAD_POOL.spawn(move || { + rayon::spawn(move || { let res = f(); sender.send(res).unwrap(); WORKER_SPAWN_COUNTER.fetch_sub(1, Ordering::SeqCst); @@ -88,13 +85,14 @@ mod implementation { F: FnOnce(&rayon::Scope<'a>, usize) -> R + Send, R: Send, { - let chunk_size = if elements < *NUM_CPUS { + let num_threads = current_num_threads(); + let chunk_size = if elements < num_threads { 1 } else { - elements / *NUM_CPUS + elements / num_threads }; - THREAD_POOL.scope(|scope| f(scope, chunk_size)) + rayon::scope(|scope| f(scope, chunk_size)) } } @@ -105,8 +103,9 @@ mod implementation { impl Waiter { /// Wait for the result. pub fn wait(&self) -> T { - if THREAD_POOL.current_thread_index().is_some() { - let msg = "wait() cannot be called from within the worker thread pool since that would lead to deadlocks"; + // This will be Some if this thread is in either the global or overflow thread pool. + if rayon::current_thread_index().is_some() { + let msg = "wait() cannot be called from within a thread pool since that would lead to deadlocks"; // panic! doesn't necessarily kill the process, so we log as well. error!("{}", msg); panic!("{}", msg); From 1e3974388ae8e1c3341dca99d9464208062155bb Mon Sep 17 00:00:00 2001 From: Daira Hopwood Date: Sun, 15 Aug 2021 10:24:40 +0100 Subject: [PATCH 18/23] Rename `multicore::log_num_cpus` to `log_num_threads`. Signed-off-by: Daira Hopwood --- CHANGELOG.md | 1 + src/domain.rs | 2 +- src/multicore.rs | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a07321207..0da3a9cc3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ and this project adheres to Rust's notion of - `bellman::multiexp::multiexp` now returns `bellman::multicore::Waiter>` instead of `Box>`. + - `bellman::multicore::log_num_cpus` is renamed to `log_num_threads`. ### Removed - `bellman::multicore::WorkerFuture` (replaced by `Waiter`). diff --git a/src/domain.rs b/src/domain.rs index fa02adce9..dc27c565b 100644 --- a/src/domain.rs +++ b/src/domain.rs @@ -259,7 +259,7 @@ impl Group for Scalar { } fn best_fft>(a: &mut [T], worker: &Worker, omega: &S, log_n: u32) { - let log_cpus = worker.log_num_cpus(); + let log_cpus = worker.log_num_threads(); if log_n <= log_cpus { serial_fft(a, omega, log_n); diff --git a/src/multicore.rs b/src/multicore.rs index d45ef8ce0..e66c8b1d0 100644 --- a/src/multicore.rs +++ b/src/multicore.rs @@ -30,7 +30,7 @@ mod implementation { Worker {} } - pub fn log_num_cpus(&self) -> u32 { + pub fn log_num_threads(&self) -> u32 { log2_floor(current_num_threads()) } From 313245ba9601e9221b3be2a1fca6c9bbc112d41f Mon Sep 17 00:00:00 2001 From: Daira Hopwood Date: Wed, 25 Aug 2021 16:56:33 +0100 Subject: [PATCH 19/23] Relax dependencies for `log` and `num_cpus` --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c984cdb1d..7b7626baa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,8 +25,8 @@ subtle = "2.2.1" # Multicore dependencies crossbeam-channel = { version = "0.5.1", optional = true } lazy_static = { version = "1.4.0", optional = true } -log = { version = "0.4.14", optional = true } -num_cpus = { version = "1.13", optional = true } +log = { version = "0.4", optional = true } +num_cpus = { version = "1", optional = true } rayon = { version = "1.5.1", optional = true } [dev-dependencies] From e3a4ea6882fc42f58141fac3ad048ff864e4530d Mon Sep 17 00:00:00 2001 From: Daira Hopwood Date: Thu, 26 Aug 2021 17:53:12 +0100 Subject: [PATCH 20/23] Simplify by removing the overflow thread pool Co-authored-by: str4d --- src/multicore.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/multicore.rs b/src/multicore.rs index e66c8b1d0..345b52781 100644 --- a/src/multicore.rs +++ b/src/multicore.rs @@ -16,10 +16,6 @@ mod implementation { lazy_static! { // See Worker::compute below for a description of this. static ref WORKER_SPAWN_MAX_COUNT: usize = current_num_threads() * 4; - static ref OVERFLOW_THREAD_POOL: rayon::ThreadPool = rayon::ThreadPoolBuilder::new() - .num_threads(current_num_threads()) - .build() - .unwrap(); } #[derive(Clone, Default)] @@ -58,12 +54,10 @@ mod implementation { // minimize the chances of memory exhaustion. if previous_count > *WORKER_SPAWN_MAX_COUNT { let thread_index = rayon::current_thread_index().unwrap_or(0); - OVERFLOW_THREAD_POOL.install(move || { - trace!("[{}, {}] switching to install to help clear backlog [threads: current {}, overflow {}, requested {}]", + rayon::scope(move |_| { + trace!("[{}] switching to scope to help clear backlog [threads: current {}, requested {}]", thread_index, - OVERFLOW_THREAD_POOL.current_thread_index().unwrap_or(0), current_num_threads(), - OVERFLOW_THREAD_POOL.current_num_threads(), WORKER_SPAWN_COUNTER.load(Ordering::SeqCst)); let res = f(); sender.send(res).unwrap(); @@ -103,7 +97,7 @@ mod implementation { impl Waiter { /// Wait for the result. pub fn wait(&self) -> T { - // This will be Some if this thread is in either the global or overflow thread pool. + // This will be Some if this thread is in the global thread pool. if rayon::current_thread_index().is_some() { let msg = "wait() cannot be called from within a thread pool since that would lead to deadlocks"; // panic! doesn't necessarily kill the process, so we log as well. From a94da4223b9a2858e0da5af289126fdebb27fc14 Mon Sep 17 00:00:00 2001 From: Daira Hopwood Date: Thu, 26 Aug 2021 20:05:21 +0100 Subject: [PATCH 21/23] Bugfix: `log_num_cpus` should be renamed also for the dummy (non-multicore) implementation. Signed-off-by: Daira Hopwood --- src/multicore.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/multicore.rs b/src/multicore.rs index 345b52781..6326d4e40 100644 --- a/src/multicore.rs +++ b/src/multicore.rs @@ -151,7 +151,7 @@ mod implementation { Worker } - pub fn log_num_cpus(&self) -> u32 { + pub fn log_num_threads(&self) -> u32 { 0 } From 5e68a2e257a665ab50c7d2cc3e588dc31267ecb8 Mon Sep 17 00:00:00 2001 From: str4d Date: Fri, 27 Aug 2021 14:13:19 +0100 Subject: [PATCH 22/23] Fix code comment after switch to global threadpool --- src/multicore.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/multicore.rs b/src/multicore.rs index 6326d4e40..18d73e6a5 100644 --- a/src/multicore.rs +++ b/src/multicore.rs @@ -49,9 +49,10 @@ mod implementation { // If the number of spawns requested has exceeded the number // of cores available for processing by some factor (the // default being 4), instead of requesting that we spawn a new - // thread, we instead execute the closure in the context of an - // install call to help clear the growing work queue and - // minimize the chances of memory exhaustion. + // thread, we instead execute the closure in the context of a + // scope call (which blocks the current thread) to help clear + // the growing work queue and minimize the chances of memory + // exhaustion. if previous_count > *WORKER_SPAWN_MAX_COUNT { let thread_index = rayon::current_thread_index().unwrap_or(0); rayon::scope(move |_| { From 2c9655639a9ce9c94c7dc98f10e8dab382ffc496 Mon Sep 17 00:00:00 2001 From: Jack Grigg Date: Thu, 2 Sep 2021 18:39:25 +0100 Subject: [PATCH 23/23] Remove println! statements from test --- src/multiexp.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/multiexp.rs b/src/multiexp.rs index 3089e7c44..cc68a9625 100644 --- a/src/multiexp.rs +++ b/src/multiexp.rs @@ -313,15 +313,10 @@ fn test_with_bls12() { .collect::>(), ); - let now = std::time::Instant::now(); let naive: ::G1 = naive_multiexp(g.clone(), v.clone()); - println!("Naive: {}", now.elapsed().as_millis()); - let now = std::time::Instant::now(); let pool = Worker::new(); - let fast = multiexp(&pool, (g, 0), FullDensity, v_bits).wait().unwrap(); - println!("Fast: {}", now.elapsed().as_millis()); assert_eq!(naive, fast); }