Skip to content

Commit

Permalink
Merge pull request #69 from zkcrypto/fil-multicore
Browse files Browse the repository at this point in the history
Multicore improvements
  • Loading branch information
str4d committed Sep 2, 2021
2 parents c0c013e + 2c96556 commit e6b2fd8
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 165 deletions.
23 changes: 23 additions & 0 deletions .github/workflows/ci.yml
Expand Up @@ -31,6 +31,29 @@ jobs:
command: test
args: --verbose --release

build:
name: Build target ${{ matrix.target }}
runs-on: ubuntu-latest
strategy:
matrix:
target:
- wasm32-wasi

steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
toolchain: 1.51.0
override: true
- name: Add target
run: rustup target add ${{ matrix.target }}
- name: cargo fetch
uses: actions-rs/cargo@v1
with:
command: fetch
- name: Build for target
run: cargo build --verbose --no-default-features --target ${{ matrix.target }}

bitrot:
name: Bitrot check
runs-on: ubuntu-latest
Expand Down
19 changes: 19 additions & 0 deletions CHANGELOG.md
Expand Up @@ -6,6 +6,25 @@ and this project adheres to Rust's notion of
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Added
- `bellman` now uses `rayon` for multithreading when the (default) `multicore`
feature flag is enabled. This means that, when this flag is enabled, the
`RAYON_NUM_THREADS` environment variable controls the number of threads that
`bellman` will use. The default, which has not changed, is to use the same
number of threads as logical CPUs.
- `bellman::multicore::Waiter`

### Changed
- `bellman::multicore` has migrated from `crossbeam` to `rayon`:
- `bellman::multicore::Worker::compute` now returns
`bellman::multicore::Waiter`.
- `bellman::multiexp::multiexp` now returns
`bellman::multicore::Waiter<Result<G, SynthesisError>>` instead of
`Box<dyn Future<Item = G, Error = SynthesisError>>`.
- `bellman::multicore::log_num_cpus` is renamed to `log_num_threads`.

### Removed
- `bellman::multicore::WorkerFuture` (replaced by `Waiter`).

## [0.10.0] - 2021-06-04
### Added
Expand Down
13 changes: 8 additions & 5 deletions Cargo.toml
Expand Up @@ -16,16 +16,19 @@ edition = "2018"
bitvec = "0.22"
blake2s_simd = "0.5"
ff = "0.10"
futures = "0.1"
futures-cpupool = { version = "0.1", optional = true }
group = "0.10"
num_cpus = { version = "1", optional = true }
crossbeam = { version = "0.7", optional = true }
pairing = { version = "0.20", optional = true }
rand_core = "0.6"
byteorder = "1"
subtle = "2.2.1"

# Multicore dependencies
crossbeam-channel = { version = "0.5.1", optional = true }
lazy_static = { version = "1.4.0", optional = true }
log = { version = "0.4", optional = true }
num_cpus = { version = "1", optional = true }
rayon = { version = "1.5.1", optional = true }

[dev-dependencies]
bls12_381 = "0.5"
criterion = "0.3"
Expand All @@ -36,7 +39,7 @@ sha2 = "0.9"

[features]
groth16 = ["pairing"]
multicore = ["futures-cpupool", "crossbeam", "num_cpus"]
multicore = ["crossbeam-channel", "lazy_static", "log", "num_cpus", "rayon"]
default = ["groth16", "multicore"]

[[test]]
Expand Down
1 change: 0 additions & 1 deletion benches/slow.rs
Expand Up @@ -5,7 +5,6 @@ use bellman::{
use bls12_381::{Bls12, Scalar};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use ff::{Field, PrimeFieldBits};
use futures::Future;
use group::{Curve, Group};
use pairing::Engine;
use rand_core::SeedableRng;
Expand Down
2 changes: 1 addition & 1 deletion src/domain.rs
Expand Up @@ -259,7 +259,7 @@ impl<S: PrimeField> Group<S> for Scalar<S> {
}

fn best_fft<S: PrimeField, T: Group<S>>(a: &mut [T], worker: &Worker, omega: &S, log_n: u32) {
let log_cpus = worker.log_num_cpus();
let log_cpus = worker.log_num_threads();

if log_n <= log_cpus {
serial_fft(a, omega, log_n);
Expand Down
2 changes: 0 additions & 2 deletions src/groth16/prover.rs
Expand Up @@ -2,8 +2,6 @@ use rand_core::RngCore;
use std::ops::{AddAssign, MulAssign};
use std::sync::Arc;

use futures::Future;

use ff::{Field, PrimeField, PrimeFieldBits};
use group::{prime::PrimeCurveAffine, Curve};
use pairing::Engine;
Expand Down
175 changes: 113 additions & 62 deletions src/multicore.rs
@@ -1,81 +1,119 @@
//! An interface for dealing with the kinds of parallel computations involved in
//! `bellman`. It's currently just a thin wrapper around [`CpuPool`] and
//! [`crossbeam`] but may be extended in the future to allow for various
//! parallelism strategies.
//!
//! [`CpuPool`]: futures_cpupool::CpuPool
//! `bellman`. It's currently just a thin wrapper around [`rayon`] but may be
//! extended in the future to allow for various parallelism strategies.

#[cfg(feature = "multicore")]
mod implementation {
use crossbeam::{self, thread::Scope};
use futures::{Future, IntoFuture, Poll};
use futures_cpupool::{CpuFuture, CpuPool};
use num_cpus;
use std::sync::atomic::{AtomicUsize, Ordering};

#[derive(Clone)]
pub struct Worker {
cpus: usize,
pool: CpuPool,
use crossbeam_channel::{bounded, Receiver};
use lazy_static::lazy_static;
use log::{error, trace};
use rayon::current_num_threads;

static WORKER_SPAWN_COUNTER: AtomicUsize = AtomicUsize::new(0);

lazy_static! {
// See Worker::compute below for a description of this.
static ref WORKER_SPAWN_MAX_COUNT: usize = current_num_threads() * 4;
}

impl Worker {
// We don't expose this outside the library so that
// all `Worker` instances have the same number of
// CPUs configured.
pub(crate) fn new_with_cpus(cpus: usize) -> Worker {
Worker {
cpus,
pool: CpuPool::new(cpus),
}
}
#[derive(Clone, Default)]
pub struct Worker {}

impl Worker {
pub fn new() -> Worker {
Self::new_with_cpus(num_cpus::get())
Worker {}
}

pub fn log_num_cpus(&self) -> u32 {
log2_floor(self.cpus)
pub fn log_num_threads(&self) -> u32 {
log2_floor(current_num_threads())
}

pub fn compute<F, R>(&self, f: F) -> WorkerFuture<R::Item, R::Error>
pub fn compute<F, R>(&self, f: F) -> Waiter<R>
where
F: FnOnce() -> R + Send + 'static,
R: IntoFuture + 'static,
R::Future: Send + 'static,
R::Item: Send + 'static,
R::Error: Send + 'static,
R: Send + 'static,
{
WorkerFuture {
future: self.pool.spawn_fn(f),
let (sender, receiver) = bounded(1);

// We keep track here of how many times spawn has been called.
// It can be called without limit, each time, putting a
// request for a new thread to execute a method on the
// ThreadPool. However, if we allow it to be called without
// limits, we run the risk of memory exhaustion due to limited
// stack space consumed by all of the pending closures to be
// executed.
let previous_count = WORKER_SPAWN_COUNTER.fetch_add(1, Ordering::SeqCst);

// If the number of spawns requested has exceeded the number
// of cores available for processing by some factor (the
// default being 4), instead of requesting that we spawn a new
// thread, we instead execute the closure in the context of a
// scope call (which blocks the current thread) to help clear
// the growing work queue and minimize the chances of memory
// exhaustion.
if previous_count > *WORKER_SPAWN_MAX_COUNT {
let thread_index = rayon::current_thread_index().unwrap_or(0);
rayon::scope(move |_| {
trace!("[{}] switching to scope to help clear backlog [threads: current {}, requested {}]",
thread_index,
current_num_threads(),
WORKER_SPAWN_COUNTER.load(Ordering::SeqCst));
let res = f();
sender.send(res).unwrap();
WORKER_SPAWN_COUNTER.fetch_sub(1, Ordering::SeqCst);
});
} else {
rayon::spawn(move || {
let res = f();
sender.send(res).unwrap();
WORKER_SPAWN_COUNTER.fetch_sub(1, Ordering::SeqCst);
});
}

Waiter { receiver }
}

pub fn scope<'a, F, R>(&self, elements: usize, f: F) -> R
where
F: FnOnce(&Scope<'a>, usize) -> R,
F: FnOnce(&rayon::Scope<'a>, usize) -> R + Send,
R: Send,
{
let chunk_size = if elements < self.cpus {
let num_threads = current_num_threads();
let chunk_size = if elements < num_threads {
1
} else {
elements / self.cpus
elements / num_threads
};

// TODO: Handle case where threads fail
crossbeam::scope(|scope| f(scope, chunk_size))
.expect("Threads aren't allowed to fail yet")
rayon::scope(|scope| f(scope, chunk_size))
}
}

pub struct WorkerFuture<T, E> {
future: CpuFuture<T, E>,
pub struct Waiter<T> {
receiver: Receiver<T>,
}

impl<T: Send + 'static, E: Send + 'static> Future for WorkerFuture<T, E> {
type Item = T;
type Error = E;
impl<T> Waiter<T> {
/// Wait for the result.
pub fn wait(&self) -> T {
// This will be Some if this thread is in the global thread pool.
if rayon::current_thread_index().is_some() {
let msg = "wait() cannot be called from within a thread pool since that would lead to deadlocks";
// panic! doesn't necessarily kill the process, so we log as well.
error!("{}", msg);
panic!("{}", msg);
}
self.receiver.recv().unwrap()
}

/// One-off sending.
pub fn done(val: T) -> Self {
let (sender, receiver) = bounded(1);
sender.send(val).unwrap();

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.future.poll()
Waiter { receiver }
}
}

Expand Down Expand Up @@ -106,8 +144,6 @@ mod implementation {

#[cfg(not(feature = "multicore"))]
mod implementation {
use futures::{future, Future, IntoFuture, Poll};

#[derive(Clone)]
pub struct Worker;

Expand All @@ -116,19 +152,16 @@ mod implementation {
Worker
}

pub fn log_num_cpus(&self) -> u32 {
pub fn log_num_threads(&self) -> u32 {
0
}

pub fn compute<F, R>(&self, f: F) -> R::Future
pub fn compute<F, R>(&self, f: F) -> Waiter<R>
where
F: FnOnce() -> R + Send + 'static,
R: IntoFuture + 'static,
R::Future: Send + 'static,
R::Item: Send + 'static,
R::Error: Send + 'static,
R: Send + 'static,
{
f().into_future()
Waiter::done(f())
}

pub fn scope<F, R>(&self, elements: usize, f: F) -> R
Expand All @@ -139,16 +172,19 @@ mod implementation {
}
}

pub struct WorkerFuture<T, E> {
future: future::FutureResult<T, E>,
pub struct Waiter<T> {
val: Option<T>,
}

impl<T: Send + 'static, E: Send + 'static> Future for WorkerFuture<T, E> {
type Item = T;
type Error = E;
impl<T> Waiter<T> {
/// Wait for the result.
pub fn wait(&mut self) -> T {
self.val.take().expect("unmet data dependency")
}

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.future.poll()
/// One-off sending.
pub fn done(val: T) -> Self {
Waiter { val: Some(val) }
}
}

Expand All @@ -159,6 +195,21 @@ mod implementation {
f(self);
}
}

/// A fake rayon ParallelIterator that is just a serial iterator.
pub(crate) trait FakeParallelIterator {
type Iter: Iterator<Item = Self::Item>;
type Item: Send;
fn into_par_iter(self) -> Self::Iter;
}

impl FakeParallelIterator for core::ops::Range<u32> {
type Iter = Self;
type Item = u32;
fn into_par_iter(self) -> Self::Iter {
self
}
}
}

pub use self::implementation::*;

0 comments on commit e6b2fd8

Please sign in to comment.