diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ddb5cbb2c..677d24c22 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,6 +31,29 @@ jobs: command: test args: --verbose --release + build: + name: Build target ${{ matrix.target }} + runs-on: ubuntu-latest + strategy: + matrix: + target: + - wasm32-wasi + + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 + with: + toolchain: 1.51.0 + override: true + - name: Add target + run: rustup target add ${{ matrix.target }} + - name: cargo fetch + uses: actions-rs/cargo@v1 + with: + command: fetch + - name: Build for target + run: cargo build --verbose --no-default-features --target ${{ matrix.target }} + bitrot: name: Bitrot check runs-on: ubuntu-latest diff --git a/CHANGELOG.md b/CHANGELOG.md index 96bc05368..0da3a9cc3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,25 @@ and this project adheres to Rust's notion of [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Added +- `bellman` now uses `rayon` for multithreading when the (default) `multicore` + feature flag is enabled. This means that, when this flag is enabled, the + `RAYON_NUM_THREADS` environment variable controls the number of threads that + `bellman` will use. The default, which has not changed, is to use the same + number of threads as logical CPUs. +- `bellman::multicore::Waiter` + +### Changed +- `bellman::multicore` has migrated from `crossbeam` to `rayon`: + - `bellman::multicore::Worker::compute` now returns + `bellman::multicore::Waiter`. + - `bellman::multiexp::multiexp` now returns + `bellman::multicore::Waiter>` instead of + `Box>`. + - `bellman::multicore::log_num_cpus` is renamed to `log_num_threads`. + +### Removed +- `bellman::multicore::WorkerFuture` (replaced by `Waiter`). ## [0.10.0] - 2021-06-04 ### Added diff --git a/Cargo.toml b/Cargo.toml index c4a1e662e..7b7626baa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,16 +16,19 @@ edition = "2018" bitvec = "0.22" blake2s_simd = "0.5" ff = "0.10" -futures = "0.1" -futures-cpupool = { version = "0.1", optional = true } group = "0.10" -num_cpus = { version = "1", optional = true } -crossbeam = { version = "0.7", optional = true } pairing = { version = "0.20", optional = true } rand_core = "0.6" byteorder = "1" subtle = "2.2.1" +# Multicore dependencies +crossbeam-channel = { version = "0.5.1", optional = true } +lazy_static = { version = "1.4.0", optional = true } +log = { version = "0.4", optional = true } +num_cpus = { version = "1", optional = true } +rayon = { version = "1.5.1", optional = true } + [dev-dependencies] bls12_381 = "0.5" criterion = "0.3" @@ -36,7 +39,7 @@ sha2 = "0.9" [features] groth16 = ["pairing"] -multicore = ["futures-cpupool", "crossbeam", "num_cpus"] +multicore = ["crossbeam-channel", "lazy_static", "log", "num_cpus", "rayon"] default = ["groth16", "multicore"] [[test]] diff --git a/benches/slow.rs b/benches/slow.rs index 542f45c21..b36099ecb 100644 --- a/benches/slow.rs +++ b/benches/slow.rs @@ -5,7 +5,6 @@ use bellman::{ use bls12_381::{Bls12, Scalar}; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use ff::{Field, PrimeFieldBits}; -use futures::Future; use group::{Curve, Group}; use pairing::Engine; use rand_core::SeedableRng; diff --git a/src/domain.rs b/src/domain.rs index fa02adce9..dc27c565b 100644 --- a/src/domain.rs +++ b/src/domain.rs @@ -259,7 +259,7 @@ impl Group for Scalar { } fn best_fft>(a: &mut [T], worker: &Worker, omega: &S, log_n: u32) { - let log_cpus = worker.log_num_cpus(); + let log_cpus = worker.log_num_threads(); if log_n <= log_cpus { serial_fft(a, omega, log_n); diff --git a/src/groth16/prover.rs b/src/groth16/prover.rs index 20f348756..197e5dcbe 100644 --- a/src/groth16/prover.rs +++ b/src/groth16/prover.rs @@ -2,8 +2,6 @@ use rand_core::RngCore; use std::ops::{AddAssign, MulAssign}; use std::sync::Arc; -use futures::Future; - use ff::{Field, PrimeField, PrimeFieldBits}; use group::{prime::PrimeCurveAffine, Curve}; use pairing::Engine; diff --git a/src/multicore.rs b/src/multicore.rs index ba69b5f33..18d73e6a5 100644 --- a/src/multicore.rs +++ b/src/multicore.rs @@ -1,81 +1,119 @@ //! An interface for dealing with the kinds of parallel computations involved in -//! `bellman`. It's currently just a thin wrapper around [`CpuPool`] and -//! [`crossbeam`] but may be extended in the future to allow for various -//! parallelism strategies. -//! -//! [`CpuPool`]: futures_cpupool::CpuPool +//! `bellman`. It's currently just a thin wrapper around [`rayon`] but may be +//! extended in the future to allow for various parallelism strategies. #[cfg(feature = "multicore")] mod implementation { - use crossbeam::{self, thread::Scope}; - use futures::{Future, IntoFuture, Poll}; - use futures_cpupool::{CpuFuture, CpuPool}; - use num_cpus; + use std::sync::atomic::{AtomicUsize, Ordering}; - #[derive(Clone)] - pub struct Worker { - cpus: usize, - pool: CpuPool, + use crossbeam_channel::{bounded, Receiver}; + use lazy_static::lazy_static; + use log::{error, trace}; + use rayon::current_num_threads; + + static WORKER_SPAWN_COUNTER: AtomicUsize = AtomicUsize::new(0); + + lazy_static! { + // See Worker::compute below for a description of this. + static ref WORKER_SPAWN_MAX_COUNT: usize = current_num_threads() * 4; } - impl Worker { - // We don't expose this outside the library so that - // all `Worker` instances have the same number of - // CPUs configured. - pub(crate) fn new_with_cpus(cpus: usize) -> Worker { - Worker { - cpus, - pool: CpuPool::new(cpus), - } - } + #[derive(Clone, Default)] + pub struct Worker {} + impl Worker { pub fn new() -> Worker { - Self::new_with_cpus(num_cpus::get()) + Worker {} } - pub fn log_num_cpus(&self) -> u32 { - log2_floor(self.cpus) + pub fn log_num_threads(&self) -> u32 { + log2_floor(current_num_threads()) } - pub fn compute(&self, f: F) -> WorkerFuture + pub fn compute(&self, f: F) -> Waiter where F: FnOnce() -> R + Send + 'static, - R: IntoFuture + 'static, - R::Future: Send + 'static, - R::Item: Send + 'static, - R::Error: Send + 'static, + R: Send + 'static, { - WorkerFuture { - future: self.pool.spawn_fn(f), + let (sender, receiver) = bounded(1); + + // We keep track here of how many times spawn has been called. + // It can be called without limit, each time, putting a + // request for a new thread to execute a method on the + // ThreadPool. However, if we allow it to be called without + // limits, we run the risk of memory exhaustion due to limited + // stack space consumed by all of the pending closures to be + // executed. + let previous_count = WORKER_SPAWN_COUNTER.fetch_add(1, Ordering::SeqCst); + + // If the number of spawns requested has exceeded the number + // of cores available for processing by some factor (the + // default being 4), instead of requesting that we spawn a new + // thread, we instead execute the closure in the context of a + // scope call (which blocks the current thread) to help clear + // the growing work queue and minimize the chances of memory + // exhaustion. + if previous_count > *WORKER_SPAWN_MAX_COUNT { + let thread_index = rayon::current_thread_index().unwrap_or(0); + rayon::scope(move |_| { + trace!("[{}] switching to scope to help clear backlog [threads: current {}, requested {}]", + thread_index, + current_num_threads(), + WORKER_SPAWN_COUNTER.load(Ordering::SeqCst)); + let res = f(); + sender.send(res).unwrap(); + WORKER_SPAWN_COUNTER.fetch_sub(1, Ordering::SeqCst); + }); + } else { + rayon::spawn(move || { + let res = f(); + sender.send(res).unwrap(); + WORKER_SPAWN_COUNTER.fetch_sub(1, Ordering::SeqCst); + }); } + + Waiter { receiver } } pub fn scope<'a, F, R>(&self, elements: usize, f: F) -> R where - F: FnOnce(&Scope<'a>, usize) -> R, + F: FnOnce(&rayon::Scope<'a>, usize) -> R + Send, + R: Send, { - let chunk_size = if elements < self.cpus { + let num_threads = current_num_threads(); + let chunk_size = if elements < num_threads { 1 } else { - elements / self.cpus + elements / num_threads }; - // TODO: Handle case where threads fail - crossbeam::scope(|scope| f(scope, chunk_size)) - .expect("Threads aren't allowed to fail yet") + rayon::scope(|scope| f(scope, chunk_size)) } } - pub struct WorkerFuture { - future: CpuFuture, + pub struct Waiter { + receiver: Receiver, } - impl Future for WorkerFuture { - type Item = T; - type Error = E; + impl Waiter { + /// Wait for the result. + pub fn wait(&self) -> T { + // This will be Some if this thread is in the global thread pool. + if rayon::current_thread_index().is_some() { + let msg = "wait() cannot be called from within a thread pool since that would lead to deadlocks"; + // panic! doesn't necessarily kill the process, so we log as well. + error!("{}", msg); + panic!("{}", msg); + } + self.receiver.recv().unwrap() + } + + /// One-off sending. + pub fn done(val: T) -> Self { + let (sender, receiver) = bounded(1); + sender.send(val).unwrap(); - fn poll(&mut self) -> Poll { - self.future.poll() + Waiter { receiver } } } @@ -106,8 +144,6 @@ mod implementation { #[cfg(not(feature = "multicore"))] mod implementation { - use futures::{future, Future, IntoFuture, Poll}; - #[derive(Clone)] pub struct Worker; @@ -116,19 +152,16 @@ mod implementation { Worker } - pub fn log_num_cpus(&self) -> u32 { + pub fn log_num_threads(&self) -> u32 { 0 } - pub fn compute(&self, f: F) -> R::Future + pub fn compute(&self, f: F) -> Waiter where F: FnOnce() -> R + Send + 'static, - R: IntoFuture + 'static, - R::Future: Send + 'static, - R::Item: Send + 'static, - R::Error: Send + 'static, + R: Send + 'static, { - f().into_future() + Waiter::done(f()) } pub fn scope(&self, elements: usize, f: F) -> R @@ -139,16 +172,19 @@ mod implementation { } } - pub struct WorkerFuture { - future: future::FutureResult, + pub struct Waiter { + val: Option, } - impl Future for WorkerFuture { - type Item = T; - type Error = E; + impl Waiter { + /// Wait for the result. + pub fn wait(&mut self) -> T { + self.val.take().expect("unmet data dependency") + } - fn poll(&mut self) -> Poll { - self.future.poll() + /// One-off sending. + pub fn done(val: T) -> Self { + Waiter { val: Some(val) } } } @@ -159,6 +195,21 @@ mod implementation { f(self); } } + + /// A fake rayon ParallelIterator that is just a serial iterator. + pub(crate) trait FakeParallelIterator { + type Iter: Iterator; + type Item: Send; + fn into_par_iter(self) -> Self::Iter; + } + + impl FakeParallelIterator for core::ops::Range { + type Iter = Self; + type Item = u32; + fn into_par_iter(self) -> Self::Iter { + self + } + } } pub use self::implementation::*; diff --git a/src/multiexp.rs b/src/multiexp.rs index 4aa235882..cc68a9625 100644 --- a/src/multiexp.rs +++ b/src/multiexp.rs @@ -1,13 +1,18 @@ -use super::multicore::Worker; +use super::multicore::{Waiter, Worker}; use bitvec::vec::BitVec; use ff::{FieldBits, PrimeField, PrimeFieldBits}; -use futures::Future; use group::prime::{PrimeCurve, PrimeCurveAffine}; use std::io; use std::iter; use std::ops::AddAssign; use std::sync::Arc; +#[cfg(feature = "multicore")] +use rayon::prelude::*; + +#[cfg(not(feature = "multicore"))] +use crate::multicore::FakeParallelIterator; + use super::SynthesisError; /// An object that builds a source of bases. @@ -146,14 +151,11 @@ impl DensityTracker { } fn multiexp_inner( - pool: &Worker, bases: S, density_map: D, exponents: Arc::ReprBits>>>, - mut skip: u32, c: u32, - handle_trivial: bool, -) -> Box> +) -> Result where for<'a> &'a Q: QueryDensity, D: Send + Sync + 'static + Clone + AsRef, @@ -162,100 +164,83 @@ where S: SourceBuilder<::Affine>, { // Perform this region of the multiexp - let this = { - let bases = bases.clone(); - let exponents = exponents.clone(); - let density_map = density_map.clone(); - - pool.compute(move || { - // Accumulate the result - let mut acc = G::identity(); - - // Build a source for the bases - let mut bases = bases.new(); - - // Create space for the buckets - let mut buckets = vec![G::identity(); (1 << c) - 1]; - - // Sort the bases into buckets - for (exp, density) in exponents.iter().zip(density_map.as_ref().iter()) { - if density { - let (exp_is_zero, exp_is_one) = { - let (first, rest) = exp.split_first().unwrap(); - let rest_unset = rest.not_any(); - (!*first && rest_unset, *first && rest_unset) - }; - - if exp_is_zero { + let this = move |bases: S, + density_map: D, + exponents: Arc::ReprBits>>>, + skip: u32| + -> Result<_, SynthesisError> { + // Accumulate the result + let mut acc = G::identity(); + + // Build a source for the bases + let mut bases = bases.new(); + + // Create space for the buckets + let mut buckets = vec![G::identity(); (1 << c) - 1]; + + // only the first round uses this + let handle_trivial = skip == 0; + + // Sort the bases into buckets + for (exp, density) in exponents.iter().zip(density_map.as_ref().iter()) { + if density { + let (exp_is_zero, exp_is_one) = { + let (first, rest) = exp.split_first().unwrap(); + let rest_unset = rest.not_any(); + (!*first && rest_unset, *first && rest_unset) + }; + + if exp_is_zero { + bases.skip(1)?; + } else if exp_is_one { + if handle_trivial { + acc.add_assign_from_source(&mut bases)?; + } else { bases.skip(1)?; - } else if exp_is_one { - if handle_trivial { - acc.add_assign_from_source(&mut bases)?; - } else { - bases.skip(1)?; - } + } + } else { + let exp = exp + .into_iter() + .by_val() + .skip(skip as usize) + .take(c as usize) + .enumerate() + .fold(0u64, |acc, (i, b)| acc + ((b as u64) << i)); + + if exp != 0 { + (&mut buckets[(exp - 1) as usize]).add_assign_from_source(&mut bases)?; } else { - let exp = exp - .into_iter() - .by_val() - .skip(skip as usize) - .take(c as usize) - .enumerate() - .fold(0u64, |acc, (i, b)| acc + ((b as u64) << i)); - - if exp != 0 { - (&mut buckets[(exp - 1) as usize]) - .add_assign_from_source(&mut bases)?; - } else { - bases.skip(1)?; - } + bases.skip(1)?; } } } + } - // Summation by parts - // e.g. 3a + 2b + 1c = a + - // (a) + b + - // ((a) + b) + c - let mut running_sum = G::identity(); - for exp in buckets.into_iter().rev() { - running_sum.add_assign(&exp); - acc.add_assign(&running_sum); - } + // Summation by parts + // e.g. 3a + 2b + 1c = a + + // (a) + b + + // ((a) + b) + c + let mut running_sum = G::identity(); + for exp in buckets.into_iter().rev() { + running_sum.add_assign(&exp); + acc.add_assign(&running_sum); + } - Ok(acc) - }) + Ok(acc) }; - skip += c; - - if skip >= G::Scalar::NUM_BITS { - // There isn't another region. - Box::new(this) - } else { - // There's another region more significant. Calculate and join it with - // this region recursively. - Box::new( - this.join(multiexp_inner( - pool, - bases, - density_map, - exponents, - skip, - c, - false, - )) - .map(move |(this, mut higher): (_, G)| { - for _ in 0..c { - higher = higher.double(); - } - - higher.add_assign(&this); - - higher - }), - ) - } + let parts = (0..G::Scalar::NUM_BITS) + .into_par_iter() + .step_by(c as usize) + .map(|skip| this(bases.clone(), density_map.clone(), exponents.clone(), skip)) + .collect::>>(); + + parts + .into_iter() + .rev() + .try_fold(G::identity(), |acc, part| { + part.map(|part| (0..c).fold(acc, |acc, _| acc.double()) + part) + }) } /// Perform multi-exponentiation. The caller is responsible for ensuring the @@ -265,7 +250,7 @@ pub fn multiexp( bases: S, density_map: D, exponents: Arc::ReprBits>>>, -) -> Box> +) -> Waiter> where for<'a> &'a Q: QueryDensity, D: Send + Sync + 'static + Clone + AsRef, @@ -286,7 +271,7 @@ where assert!(query_size == exponents.len()); } - multiexp_inner(pool, bases, density_map, exponents, 0, c, true) + pool.compute(move || multiexp_inner(bases, density_map, exponents, c)) } #[cfg(feature = "pairing")] @@ -331,7 +316,6 @@ fn test_with_bls12() { let naive: ::G1 = naive_multiexp(g.clone(), v.clone()); let pool = Worker::new(); - let fast = multiexp(&pool, (g, 0), FullDensity, v_bits).wait().unwrap(); assert_eq!(naive, fast);