Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added by_blocks method #831

Merged
merged 11 commits into from
Feb 27, 2024
199 changes: 190 additions & 9 deletions rayon-demo/src/find/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/// Simple benchmarks of `find_any()` performance
/// Simple benchmarks of `find_any()` and `find_first` performance

macro_rules! make_tests {
($n:expr, $m:ident) => {
Expand All @@ -21,56 +21,237 @@ macro_rules! make_tests {
.collect()
});

// this is a very dumb find_first algorithm.
// no early aborts so we have a linear best case cost.
fn find_dumb<I: ParallelIterator, P: Fn(&I::Item) -> bool + Send + Sync>(
iter: I,
cond: P,
) -> Option<I::Item> {
iter.map(|e| if cond(&e) { Some(e) } else { None })
.reduce(|| None, |left, right| left.or(right))
}

#[bench]
fn parallel_find_any_start(b: &mut Bencher) {
let needle = HAYSTACK[0][0];
b.iter(|| assert!(HAYSTACK.par_iter().find_any(|&&x| x[0] == needle).is_some()));
}

#[bench]
fn parallel_find_first(b: &mut Bencher) {
fn parallel_find_first_start(b: &mut Bencher) {
let needle = HAYSTACK[0][0];
b.iter(|| assert!(HAYSTACK.par_iter().find_any(|&&x| x[0] == needle).is_some()));
}

#[bench]
fn serial_find_first(b: &mut Bencher) {
fn parallel_find_first_blocks_start(b: &mut Bencher) {
let needle = HAYSTACK[0][0];
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.by_exponential_blocks()
.find_first(|&&x| x[0] == needle)
.is_some())
});
}

#[bench]
fn serial_find_start(b: &mut Bencher) {
let needle = HAYSTACK[0][0];
b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x[0] == needle).is_some()));
}

#[bench]
fn parallel_find_last(b: &mut Bencher) {
fn parallel_find_any_end(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() - 1][0];
b.iter(|| assert!(HAYSTACK.par_iter().find_any(|&&x| x[0] == needle).is_some()));
}
#[bench]
fn parallel_find_first_end(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() - 1][0];
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.find_first(|&&x| x[0] == needle)
.is_some())
});
}
#[bench]
fn parallel_find_first_blocks_end(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() - 1][0];
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.by_exponential_blocks()
.find_first(|&&x| x[0] == needle)
.is_some())
});
}

#[bench]
fn serial_find_last(b: &mut Bencher) {
fn serial_find_end(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() - 1][0];
b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x[0] == needle).is_some()));
}

#[bench]
fn parallel_find_middle(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() / 3 * 2][0];
fn parallel_find_any_third(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() / 3][0];
b.iter(|| assert!(HAYSTACK.par_iter().find_any(|&&x| x[0] == needle).is_some()));
}

#[bench]
fn parallel_find_first_third(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() / 3][0];
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.find_first(|&&x| x[0] == needle)
.is_some())
});
}

#[bench]
fn parallel_find_dumb_third(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() / 3][0];
b.iter(
|| assert!(find_dumb(HAYSTACK.par_iter(), (|&&x| x[0] == needle)).is_some()),
);
}

#[bench]
fn parallel_find_first_blocks_third(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() / 3][0];
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.by_exponential_blocks()
.find_first(|&&x| x[0] == needle)
.is_some())
});
}

#[bench]
fn serial_find_third(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() / 3][0];
b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x[0] == needle).is_some()));
}

#[bench]
fn parallel_find_any_middle(b: &mut Bencher) {
let needle = HAYSTACK[(HAYSTACK.len() / 2).saturating_sub(1)][0];
b.iter(|| assert!(HAYSTACK.par_iter().find_any(|&&x| x[0] == needle).is_some()));
}

#[bench]
fn parallel_find_first_middle(b: &mut Bencher) {
let needle = HAYSTACK[(HAYSTACK.len() / 2).saturating_sub(1)][0];
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.find_first(|&&x| x[0] == needle)
.is_some())
});
}

#[bench]
fn parallel_find_dumb_middle(b: &mut Bencher) {
let needle = HAYSTACK[(HAYSTACK.len() / 2).saturating_sub(1)][0];
b.iter(
|| assert!(find_dumb(HAYSTACK.par_iter(), (|&&x| x[0] == needle)).is_some()),
);
}

#[bench]
fn parallel_find_first_blocks_middle(b: &mut Bencher) {
let needle = HAYSTACK[(HAYSTACK.len() / 2).saturating_sub(1)][0];
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.by_exponential_blocks()
.find_first(|&&x| x[0] == needle)
.is_some())
});
}

#[bench]
fn serial_find_middle(b: &mut Bencher) {
let needle = HAYSTACK[(HAYSTACK.len() / 2).saturating_sub(1)][0];
b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x[0] == needle).is_some()));
}

#[bench]
fn parallel_find_any_two_thirds(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() / 3 * 2][0];
b.iter(|| assert!(HAYSTACK.par_iter().find_any(|&&x| x[0] == needle).is_some()));
}

#[bench]
fn parallel_find_first_two_thirds(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() / 3 * 2][0];
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.find_first(|&&x| x[0] == needle)
.is_some())
});
}

#[bench]
fn parallel_find_first_blocks_two_thirds(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() / 3 * 2][0];
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.by_exponential_blocks()
.find_first(|&&x| x[0] == needle)
.is_some())
});
}

#[bench]
fn serial_find_two_thirds(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() / 3 * 2][0];
b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x[0] == needle).is_some()));
}

#[bench]
fn parallel_find_missing(b: &mut Bencher) {
fn parallel_find_any_missing(b: &mut Bencher) {
let needle = HAYSTACK.iter().map(|v| v[0]).max().unwrap() + 1;
b.iter(|| assert!(HAYSTACK.par_iter().find_any(|&&x| x[0] == needle).is_none()));
}

#[bench]
fn parallel_find_first_missing(b: &mut Bencher) {
let needle = HAYSTACK.iter().map(|v| v[0]).max().unwrap() + 1;
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.find_first(|&&x| x[0] == needle)
.is_none())
});
}

#[bench]
fn parallel_find_first_blocks_missing(b: &mut Bencher) {
let needle = HAYSTACK.iter().map(|v| v[0]).max().unwrap() + 1;
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.by_exponential_blocks()
.find_first(|&&x| x[0] == needle)
.is_none())
});
}

#[bench]
fn serial_find_missing(b: &mut Bencher) {
let needle = HAYSTACK.iter().map(|v| v[0]).max().unwrap() + 1;
b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x[0] == needle).is_none()));
}

#[bench]
fn parallel_find_common(b: &mut Bencher) {
fn parallel_find_any_common(b: &mut Bencher) {
b.iter(|| {
assert!(HAYSTACK
.par_iter()
Expand Down
2 changes: 2 additions & 0 deletions src/compile_fail/must_use.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ macro_rules! must_use {
}

must_use! {
by_exponential_blocks /** v.par_iter().by_exponential_blocks(); */
by_uniform_blocks /** v.par_iter().by_uniform_blocks(2); */
step_by /** v.par_iter().step_by(2); */
chain /** v.par_iter().chain(&v); */
chunks /** v.par_iter().chunks(2); */
Expand Down
131 changes: 131 additions & 0 deletions src/iter/blocks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use super::plumbing::*;
use super::*;

struct BlocksCallback<S, C> {
sizes: S,
consumer: C,
len: usize,
}

impl<T, S, C> ProducerCallback<T> for BlocksCallback<S, C>
where
C: UnindexedConsumer<T>,
S: Iterator<Item = usize>,
{
type Output = C::Result;

fn callback<P: Producer<Item = T>>(mut self, mut producer: P) -> Self::Output {
let mut remaining_len = self.len;
let mut consumer = self.consumer;

// we need a local variable for the accumulated results
// we call the reducer's identity by splitting at 0
let (left_consumer, right_consumer, _) = consumer.split_at(0);
let mut leftmost_res = left_consumer.into_folder().complete();
consumer = right_consumer;

// now we loop on each block size
while remaining_len > 0 && !consumer.full() {
// we compute the next block's size
let size = self.sizes.next().unwrap_or(std::usize::MAX);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a design choice here when sizes.next() returns None, with at least three options:

  • Consume the remainder as one entire block, which is what you have now.
  • Run the remainder for side effects, without feeding it into our consumer. Skip does this with its skipped prefix to match the semantics of std::iter::Skip.
  • Ignore the remainder altogether, basically acting like the suffix of Take.

Did you consider this? I think behavior like Take might actually be a nice choice.

Another question is whether size == 0 should be allowed. At the very least, it would be a wasted split here, which could just be considered the user's poor choice. Or we could make that impossible by having S produce NonZeroUsize items, but that makes it more annoying to write the iterator.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. It seems quite surprising to me to ignore some of the elements. I would definitely not expect that. Ultimately, I think we want to offer users a choice of how to manage this -- another behavior I could imagine would be "go on using the last block size that got returned until we are complete".

I am wondering if we can express these semantics with some composed operators? I'd sort of like to say something like .by_blocks().skip_remainder() or something. We could just add inherent methods to the return type to allow users to change back and forth between those modes, no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that the user can express most of those semantics in the iterator itself:

  • current MAX behavior: my_iter.chain(iter::once(usize::MAX))
  • repeat behavior: my_iter.chain(iter::repeat(last))
  • my suggested Take behavior: just my_iter, and let its None be the end
  • run remainder without consuming: ???

We could just add inherent methods to the return type to allow users to change back and forth between those modes, no?

Yeah, that's possible too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another might be asserting that there is no remainder

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at minimum we should document these techniques, and i might prefer a convenient way to express them

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we only allow constant sizes and doubling sizes in the public interface then this choice about what to do when the size iterator ends is of private concern.
i used to have both versions, one like an implicit take when it runs out and the other one forcing to consume everything. so there is indeed something to choose here.

i would favor consuming it all since I happened to get some bugs missing some elements.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, i have some questions like:

  • should i add unit tests ? where ?
  • should i add benches ? where ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be in favor of only allowing the two options until we know there is demand for something more general. As for unit tests, we typically just add #[test] in the modules, I think? Alternatively, you can add tests into the tests directory but we don't usually do that. (Is this right, @cuviper?)

For benchmarks, we generally modify the rayon-bench project.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For tests, there are a few generic sanity checks to add: tests/clones.rs, tests/debug.rs, and src/compile_fail/must_use.rs. Beyond that, adding unit #[test] directly in the module is fine, or break into a tests submodule if there are a lot. Doc-test examples are also nice to serve both doc and testing roles.

let capped_size = remaining_len.min(size);
remaining_len -= capped_size;

// split the producer
let (left_producer, right_producer) = producer.split_at(capped_size);
producer = right_producer;

// split the consumer
let (left_consumer, right_consumer, _) = consumer.split_at(capped_size);
consumer = right_consumer;

leftmost_res = consumer.to_reducer().reduce(
leftmost_res,
bridge_producer_consumer(capped_size, left_producer, left_consumer),
);
}
leftmost_res
}
}

/// `ExponentialBlocks` is a parallel iterator that consumes itself as a sequence
/// of parallel blocks of increasing sizes (exponentially).
///
/// This struct is created by the [`by_exponential_blocks()`] method on [`IndexedParallelIterator`]
///
/// [`by_exponential_blocks()`]: trait.IndexedParallelIterator.html#method.by_exponential_blocks
/// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
#[derive(Debug, Clone)]
pub struct ExponentialBlocks<I> {
base: I,
}

impl<I> ExponentialBlocks<I> {
pub(super) fn new(base: I) -> Self {
Self { base }
}
}

impl<I> ParallelIterator for ExponentialBlocks<I>
where
I: IndexedParallelIterator,
{
type Item = I::Item;

fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: UnindexedConsumer<Self::Item>,
{
let first = crate::current_num_threads();
let callback = BlocksCallback {
consumer,
sizes: std::iter::successors(Some(first), exponential_size),
len: self.base.len(),
};
self.base.with_producer(callback)
}
}

fn exponential_size(size: &usize) -> Option<usize> {
Some(size.saturating_mul(2))
}

/// `UniformBlocks` is a parallel iterator that consumes itself as a sequence
/// of parallel blocks of constant sizes.
///
/// This struct is created by the [`by_uniform_blocks()`] method on [`IndexedParallelIterator`]
///
/// [`by_uniform_blocks()`]: trait.IndexedParallelIterator.html#method.by_uniform_blocks
/// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
#[derive(Debug, Clone)]
pub struct UniformBlocks<I> {
base: I,
block_size: usize,
}

impl<I> UniformBlocks<I> {
pub(super) fn new(base: I, block_size: usize) -> Self {
Self { base, block_size }
}
}

impl<I> ParallelIterator for UniformBlocks<I>
where
I: IndexedParallelIterator,
{
type Item = I::Item;

fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: UnindexedConsumer<Self::Item>,
{
let callback = BlocksCallback {
consumer,
sizes: std::iter::repeat(self.block_size),
len: self.base.len(),
};
self.base.with_producer(callback)
}
}