Skip to content

Commit

Permalink
FEAT: Implement generic parallel collect
Browse files Browse the repository at this point in the history
Allow non-copy elements by implementing dropping partial results from
collect (needed if there is a panic with unwinding during the
apply-collect process).

It is implemented by:

1. allocate an uninit output array of the right size and layout
2. use parallelsplits to split the Zip into chunks processed in parallel
3. for each chunk keep track of the slice of written elements
4. each output chunk is contiguous due to the layout being picked to
match the Zip's preferred layout
5. Use reduce to merge adjacent partial results; this ensures
we drop all the rests correctly, if there is a panic in any thread
  • Loading branch information
bluss committed May 11, 2020
1 parent 642e4cc commit 24f0b20
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 6 deletions.
126 changes: 120 additions & 6 deletions src/parallel/impl_par_methods.rs
Expand Up @@ -2,6 +2,8 @@ use crate::{Array, ArrayBase, DataMut, Dimension, IntoNdProducer, NdProducer, Zi
use crate::AssignElem;

use crate::parallel::prelude::*;
use crate::parallel::par::ParallelSplits;
use super::send_producer::SendProducer;

/// # Parallel methods
///
Expand Down Expand Up @@ -43,6 +45,8 @@ where

// Zip

const COLLECT_MAX_PARTS: usize = 256;

macro_rules! zip_impl {
($([$notlast:ident $($p:ident)*],)+) => {
$(
Expand Down Expand Up @@ -71,14 +75,56 @@ macro_rules! zip_impl {
/// inputs.
///
/// If all inputs are c- or f-order respectively, that is preserved in the output.
///
/// Restricted to functions that produce copyable results for technical reasons; other
/// cases are not yet implemented.
pub fn par_apply_collect<R>(self, f: impl Fn($($p::Item,)* ) -> R + Sync + Send) -> Array<R, D>
where R: Copy + Send
pub fn par_apply_collect<R>(self, f: impl Fn($($p::Item,)* ) -> R + Sync + Send)
-> Array<R, D>
where R: Send
{
let mut output = self.uninitalized_for_current_layout::<R>();
self.par_apply_assign_into(&mut output, f);
let total_len = output.len();

// Create a parallel iterator that produces chunks of the zip with the output
// array. It's crucial that both parts split in the same way, and in a way
// so that the chunks of the output are still contig.
//
// Use a raw view so that we can alias the output data here and in the partial
// result.
let splits = unsafe {
ParallelSplits {
iter: self.and(SendProducer::new(output.raw_view_mut().cast::<R>())),
// Keep it from splitting the Zip down too small
min_size: total_len / COLLECT_MAX_PARTS,
}
};

let collect_result = splits.map(move |zip| {
// Create a partial result for the contiguous slice of data being written to
let output = zip.last_producer();
debug_assert!(output.is_contiguous());

let mut partial = Partial::new(output.as_ptr());

// Apply the mapping function on this chunk of the zip
let partial_len = &mut partial.len;
let f = &f;
zip.apply(move |$($p,)* output_elem: *mut R| unsafe {
output_elem.write(f($($p),*));
if std::mem::needs_drop::<R>() {
*partial_len += 1;
}
});

partial
})
.reduce(Partial::stub, Partial::try_merge);

if std::mem::needs_drop::<R>() {
debug_assert_eq!(total_len, collect_result.len, "collect len is not correct, expected {}", total_len);
assert!(collect_result.len == total_len, "Collect: Expected number of writes not completed");
}

// Here the collect result is complete, and we release its ownership and transfer
// it to the output array.
collect_result.release_ownership();
unsafe {
output.assume_init()
}
Expand Down Expand Up @@ -113,3 +159,71 @@ zip_impl! {
[true P1 P2 P3 P4 P5],
[false P1 P2 P3 P4 P5 P6],
}

/// Partial is a partially written contiguous slice of data;
/// it is the owner of the elements, but not the allocation,
/// and will drop the elements on drop.
#[must_use]
pub(crate) struct Partial<T> {
/// Data pointer
ptr: *mut T,
/// Current length
len: usize,
}

impl<T> Partial<T> {
/// Create an empty partial for this data pointer
pub(crate) fn new(ptr: *mut T) -> Self {
Self {
ptr,
len: 0,
}
}

pub(crate) fn stub() -> Self {
Self { len: 0, ptr: 0 as *mut _ }
}

pub(crate) fn is_stub(&self) -> bool {
self.ptr.is_null()
}

/// Release Partial's ownership of the written elements, and return the current length
pub(crate) fn release_ownership(mut self) -> usize {
let ret = self.len;
self.len = 0;
ret
}

/// Merge if they are in order (left to right) and contiguous.
/// Skips merge if T does not need drop.
pub(crate) fn try_merge(mut left: Self, right: Self) -> Self {
if !std::mem::needs_drop::<T>() {
return left;
}
// Merge the partial collect results; the final result will be a slice that
// covers the whole output.
if left.is_stub() {
right
} else if left.ptr.wrapping_add(left.len) == right.ptr {
left.len += right.release_ownership();
left
} else {
// failure to merge; this is a bug in collect, so we will never reach this
debug_assert!(false, "Partial: failure to merge left and right parts");
left
}
}
}

unsafe impl<T> Send for Partial<T> where T: Send { }

impl<T> Drop for Partial<T> {
fn drop(&mut self) {
if !self.ptr.is_null() {
unsafe {
std::ptr::drop_in_place(std::slice::from_raw_parts_mut(self.ptr, self.len));
}
}
}
}
1 change: 1 addition & 0 deletions src/parallel/mod.rs
Expand Up @@ -157,4 +157,5 @@ pub use crate::par_azip;
mod impl_par_methods;
mod into_impls;
mod par;
mod send_producer;
mod zipmacro;
83 changes: 83 additions & 0 deletions src/parallel/send_producer.rs
@@ -0,0 +1,83 @@

use crate::imp_prelude::*;
use crate::{Layout, NdProducer};
use std::ops::{Deref, DerefMut};

/// An NdProducer that is unconditionally `Send`.
#[repr(transparent)]
pub(crate) struct SendProducer<T> {
inner: T
}

impl<T> SendProducer<T> {
/// Create an unconditionally `Send` ndproducer from the producer
pub(crate) unsafe fn new(producer: T) -> Self { Self { inner: producer } }
}

unsafe impl<P> Send for SendProducer<P> { }

impl<P> Deref for SendProducer<P> {
type Target = P;
fn deref(&self) -> &P { &self.inner }
}

impl<P> DerefMut for SendProducer<P> {
fn deref_mut(&mut self) -> &mut P { &mut self.inner }
}

impl<P: NdProducer> NdProducer for SendProducer<P>
where P: NdProducer,
{
type Item = P::Item;
type Dim = P::Dim;
type Ptr = P::Ptr;
type Stride = P::Stride;

private_impl! {}

#[inline(always)]
fn raw_dim(&self) -> Self::Dim {
self.inner.raw_dim()
}

#[inline(always)]
fn equal_dim(&self, dim: &Self::Dim) -> bool {
self.inner.equal_dim(dim)
}

#[inline(always)]
fn as_ptr(&self) -> Self::Ptr {
self.inner.as_ptr()
}

#[inline(always)]
fn layout(&self) -> Layout {
self.inner.layout()
}

#[inline(always)]
unsafe fn as_ref(&self, ptr: Self::Ptr) -> Self::Item {
self.inner.as_ref(ptr)
}

#[inline(always)]
unsafe fn uget_ptr(&self, i: &Self::Dim) -> Self::Ptr {
self.inner.uget_ptr(i)
}

#[inline(always)]
fn stride_of(&self, axis: Axis) -> Self::Stride {
self.inner.stride_of(axis)
}

#[inline(always)]
fn contiguous_stride(&self) -> Self::Stride {
self.inner.contiguous_stride()
}

fn split_at(self, axis: Axis, index: usize) -> (Self, Self) {
let (a, b) = self.inner.split_at(axis, index);
(Self { inner: a }, Self { inner: b })
}
}

0 comments on commit 24f0b20

Please sign in to comment.