From ad856bd06f995f097ef8fe473c09a296c79616ab Mon Sep 17 00:00:00 2001 From: bluss Date: Wed, 29 Apr 2020 14:26:18 +0200 Subject: [PATCH 01/15] FEAT: Special case D::from_dimension for Ix1 --- src/dimension/dimension_trait.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/dimension/dimension_trait.rs b/src/dimension/dimension_trait.rs index 4f03d3aac..b2ef92e43 100644 --- a/src/dimension/dimension_trait.rs +++ b/src/dimension/dimension_trait.rs @@ -540,6 +540,14 @@ impl Dimension for Dim<[Ix; 1]> { fn try_remove_axis(&self, axis: Axis) -> Self::Smaller { self.remove_axis(axis) } + + fn from_dimension(d: &D2) -> Option { + if 1 == d.ndim() { + Some(Ix1(d[0])) + } else { + None + } + } private_impl! {} } From 511f8b2cc97bb12ce75b8d94945cef5ed80dfad6 Mon Sep 17 00:00:00 2001 From: bluss Date: Fri, 1 May 2020 22:50:26 +0200 Subject: [PATCH 02/15] FIX: Use Zip::fold_while for final reduction in parallel array view When fold_with is used, use Zip::fold_while to fold the array view's parallel iterator. Note that in some cases, the IntoIterator of the view is used instead. --- src/parallel/par.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/parallel/par.rs b/src/parallel/par.rs index efd761acf..6713952b7 100644 --- a/src/parallel/par.rs +++ b/src/parallel/par.rs @@ -170,7 +170,14 @@ macro_rules! par_iter_view_wrapper { fn fold_with(self, folder: F) -> F where F: Folder, { - self.into_iter().fold(folder, move |f, elt| f.consume(elt)) + Zip::from(self.0).fold_while(folder, |mut folder, elt| { + folder = folder.consume(elt); + if folder.full() { + FoldWhile::Done(folder) + } else { + FoldWhile::Continue(folder) + } + }).into_inner() } } From 192a166e479cb68bb480a7126fd1233722f73c5c Mon Sep 17 00:00:00 2001 From: bluss Date: Mon, 11 May 2020 23:31:18 +0200 Subject: [PATCH 03/15] FIX: Make is_contiguous pub(crate) The reason this method is not yet public, is that it's not accurate (false negatives) for less common layouts. It's correct for C/F i.e row/col major layouts. --- src/impl_methods.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/impl_methods.rs b/src/impl_methods.rs index db502663f..8a906be3d 100644 --- a/src/impl_methods.rs +++ b/src/impl_methods.rs @@ -1293,7 +1293,11 @@ where is_standard_layout(&self.dim, &self.strides) } - fn is_contiguous(&self) -> bool { + /// Return true if the array is known to be contiguous. + /// + /// Will detect c- and f-contig arrays correctly, but otherwise + /// There are some false negatives. + pub(crate) fn is_contiguous(&self) -> bool { D::is_contiguous(&self.dim, &self.strides) } From 35e89f878adcaa4899b567d8deccbe6e426c11a2 Mon Sep 17 00:00:00 2001 From: bluss Date: Mon, 11 May 2020 23:31:18 +0200 Subject: [PATCH 04/15] TEST: Add benchmarks for parallel collect --- benches/par_rayon.rs | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/benches/par_rayon.rs b/benches/par_rayon.rs index e8c4cfef3..cca35e00b 100644 --- a/benches/par_rayon.rs +++ b/benches/par_rayon.rs @@ -136,3 +136,39 @@ fn rayon_add(bench: &mut Bencher) { }); }); } + +const COLL_STRING_N: usize = 64; +const COLL_F64_N: usize = 128; + +#[bench] +fn vec_string_collect(bench: &mut test::Bencher) { + let v = vec![""; COLL_STRING_N * COLL_STRING_N]; + bench.iter(|| { + v.iter().map(|s| s.to_owned()).collect::>() + }); +} + +#[bench] +fn array_string_collect(bench: &mut test::Bencher) { + let v = Array::from_elem((COLL_STRING_N, COLL_STRING_N), ""); + bench.iter(|| { + Zip::from(&v).par_apply_collect(|s| s.to_owned()) + }); +} + +#[bench] +fn vec_f64_collect(bench: &mut test::Bencher) { + let v = vec![1.; COLL_F64_N * COLL_F64_N]; + bench.iter(|| { + v.iter().map(|s| s + 1.).collect::>() + }); +} + +#[bench] +fn array_f64_collect(bench: &mut test::Bencher) { + let v = Array::from_elem((COLL_F64_N, COLL_F64_N), 1.); + bench.iter(|| { + Zip::from(&v).par_apply_collect(|s| s + 1.) + }); +} + From 84295c4e893f2ee7e1cd4d95f2ef46bfde9ce0f8 Mon Sep 17 00:00:00 2001 From: bluss Date: Mon, 11 May 2020 23:31:18 +0200 Subject: [PATCH 05/15] FEAT: Factor out traits SplitAt and SplitPreference To be used by Zip and parallel Zip --- src/indexes.rs | 3 ++- src/lib.rs | 1 + src/split_at.rs | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ src/zip/mod.rs | 44 +++++++++++++++++++++++++------------------- 4 files changed, 77 insertions(+), 20 deletions(-) create mode 100644 src/split_at.rs diff --git a/src/indexes.rs b/src/indexes.rs index a0831b0e7..6f45fb3de 100644 --- a/src/indexes.rs +++ b/src/indexes.rs @@ -7,7 +7,8 @@ // except according to those terms. use super::Dimension; use crate::dimension::IntoDimension; -use crate::zip::{Offset, Splittable}; +use crate::zip::Offset; +use crate::split_at::SplitAt; use crate::Axis; use crate::Layout; use crate::NdProducer; diff --git a/src/lib.rs b/src/lib.rs index a5c1c94ac..b40eee01a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -179,6 +179,7 @@ mod numeric_util; mod shape_builder; #[macro_use] mod slice; +mod split_at; mod stacking; #[macro_use] mod zip; diff --git a/src/split_at.rs b/src/split_at.rs new file mode 100644 index 000000000..da15dfcf3 --- /dev/null +++ b/src/split_at.rs @@ -0,0 +1,49 @@ + +use crate::imp_prelude::*; + +/// Arrays and similar that can be split along an axis +pub(crate) trait SplitAt { + fn split_at(self, axis: Axis, index: usize) -> (Self, Self) where Self: Sized; +} + +pub(crate) trait SplitPreference : SplitAt { + fn can_split(&self) -> bool; + fn size(&self) -> usize; + fn split_preference(&self) -> (Axis, usize); + fn split(self) -> (Self, Self) where Self: Sized { + let (axis, index) = self.split_preference(); + self.split_at(axis, index) + } +} + +impl SplitAt for D +where + D: Dimension, +{ + fn split_at(self, axis: Axis, index: Ix) -> (Self, Self) { + let mut d1 = self; + let mut d2 = d1.clone(); + let i = axis.index(); + let len = d1[i]; + d1[i] = index; + d2[i] = len - index; + (d1, d2) + } +} + +impl<'a, A, D> SplitAt for ArrayViewMut<'a, A, D> + where D: Dimension +{ + fn split_at(self, axis: Axis, index: usize) -> (Self, Self) { + self.split_at(axis, index) + } +} + + +impl SplitAt for RawArrayViewMut + where D: Dimension +{ + fn split_at(self, axis: Axis, index: usize) -> (Self, Self) { + self.split_at(axis, index) + } +} diff --git a/src/zip/mod.rs b/src/zip/mod.rs index 2b6612b54..f319afd4b 100644 --- a/src/zip/mod.rs +++ b/src/zip/mod.rs @@ -20,6 +20,7 @@ use crate::NdIndex; use crate::indexes::{indices, Indices}; use crate::layout::{CORDER, FORDER}; +use crate::split_at::{SplitPreference, SplitAt}; use partial_array::PartialArray; @@ -92,25 +93,6 @@ where private_impl! {} } -pub trait Splittable: Sized { - fn split_at(self, axis: Axis, index: Ix) -> (Self, Self); -} - -impl Splittable for D -where - D: Dimension, -{ - fn split_at(self, axis: Axis, index: Ix) -> (Self, Self) { - let mut d1 = self; - let mut d2 = d1.clone(); - let i = axis.index(); - let len = d1[i]; - d1[i] = index; - d2[i] = len - index; - (d1, d2) - } -} - /// Argument conversion into a producer. /// /// Slices and vectors can be used (equivalent to 1-dimensional array views). @@ -1121,9 +1103,31 @@ macro_rules! map_impl { pub fn split(self) -> (Self, Self) { debug_assert_ne!(self.size(), 0, "Attempt to split empty zip"); debug_assert_ne!(self.size(), 1, "Attempt to split zip with 1 elem"); + SplitPreference::split(self) + } + } + + impl SplitPreference for Zip<($($p,)*), D> + where D: Dimension, + $($p: NdProducer ,)* + { + fn can_split(&self) -> bool { self.size() > 1 } + + fn size(&self) -> usize { self.size() } + + fn split_preference(&self) -> (Axis, usize) { // Always split in a way that preserves layout (if any) let axis = self.max_stride_axis(); let index = self.len_of(axis) / 2; + (axis, index) + } + } + + impl SplitAt for Zip<($($p,)*), D> + where D: Dimension, + $($p: NdProducer ,)* + { + fn split_at(self, axis: Axis, index: usize) -> (Self, Self) { let (p1, p2) = self.parts.split_at(axis, index); let (d1, d2) = self.dimension.split_at(axis, index); (Zip { @@ -1139,7 +1143,9 @@ macro_rules! map_impl { layout_tendency: self.layout_tendency, }) } + } + )+ } } From 3ab67eb062d5317edfe35ced0d5e24022c6fe2fa Mon Sep 17 00:00:00 2001 From: bluss Date: Mon, 11 May 2020 23:31:18 +0200 Subject: [PATCH 06/15] FEAT: Add ParalleIterator ParallelSplits This iterator is for internal use; it produces the splits of a Zip (it splits the Zip the same way as the regular parallel iterator for Zip, but here the whole Zip is the produced item of the iterator.) This is helpful as a building block for other operations. --- src/parallel/par.rs | 53 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/src/parallel/par.rs b/src/parallel/par.rs index 6713952b7..c7a0fcb3a 100644 --- a/src/parallel/par.rs +++ b/src/parallel/par.rs @@ -15,6 +15,7 @@ use crate::iter::AxisIter; use crate::iter::AxisIterMut; use crate::Dimension; use crate::{ArrayView, ArrayViewMut}; +use crate::split_at::SplitPreference; /// Parallel iterator wrapper. #[derive(Copy, Clone, Debug)] @@ -250,7 +251,7 @@ macro_rules! zip_impl { type Item = ($($p::Item ,)*); fn split(self) -> (Self, Option) { - if self.0.size() <= 1 { + if !self.0.can_split() { return (self, None) } let (a, b) = self.0.split(); @@ -282,3 +283,53 @@ zip_impl! { [P1 P2 P3 P4 P5], [P1 P2 P3 P4 P5 P6], } + +/// A parallel iterator (unindexed) that produces the splits of the array +/// or producer `P`. +pub(crate) struct ParallelSplits

{ + pub(crate) iter: P, + pub(crate) min_size: usize, +} + +impl

ParallelIterator for ParallelSplits

+ where P: SplitPreference + Send, +{ + type Item = P; + + fn drive_unindexed(self, consumer: C) -> C::Result + where C: UnindexedConsumer + { + bridge_unindexed(self, consumer) + } + + fn opt_len(&self) -> Option { + None + } +} + +impl

UnindexedProducer for ParallelSplits

+ where P: SplitPreference + Send, +{ + type Item = P; + + fn split(self) -> (Self, Option) { + if self.iter.size() <= self.min_size || !self.iter.can_split() { + return (self, None) + } + let (a, b) = self.iter.split(); + (ParallelSplits { + iter: a, + min_size: self.min_size, + }, + Some(ParallelSplits { + iter: b, + min_size: self.min_size, + })) + } + + fn fold_with(self, folder: Fold) -> Fold + where Fold: Folder, + { + folder.consume(self.iter) + } +} From ee97dbfbaf42b3a21d92384f414ec20fce0617dd Mon Sep 17 00:00:00 2001 From: bluss Date: Mon, 11 May 2020 23:31:18 +0200 Subject: [PATCH 07/15] TEST: Add parallel collect test for small arrays --- tests/par_zip.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/tests/par_zip.rs b/tests/par_zip.rs index a24dc8e80..36d7f43c2 100644 --- a/tests/par_zip.rs +++ b/tests/par_zip.rs @@ -97,6 +97,28 @@ fn test_zip_collect() { assert_abs_diff_eq!(a, &b + &c, epsilon = 1e-6); assert_eq!(a.strides(), b.strides()); } + +} + +#[test] +#[cfg(feature = "approx")] +fn test_zip_small_collect() { + use approx::assert_abs_diff_eq; + + for m in 0..32 { + for n in 0..16 { + let dim = (m, n); + let b = Array::from_shape_fn(dim, |(i, j)| 1. / (i + 2 * j + 1) as f32); + let c = Array::from_shape_fn(dim, |(i, j)| f32::ln((1 + i + j) as f32)); + + { + let a = Zip::from(&b).and(&c).par_apply_collect(|x, y| x + y); + + assert_abs_diff_eq!(a, &b + &c, epsilon = 1e-6); + assert_eq!(a.strides(), b.strides()); + } + } + } } #[test] From fe2ebf6eb96a191231751ebae65e76ab94476e76 Mon Sep 17 00:00:00 2001 From: bluss Date: Mon, 11 May 2020 23:31:18 +0200 Subject: [PATCH 08/15] FEAT: Add internal Zip::last_producer This method is useful for parallel Zip. --- src/zip/mod.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/zip/mod.rs b/src/zip/mod.rs index f319afd4b..af251c235 100644 --- a/src/zip/mod.rs +++ b/src/zip/mod.rs @@ -916,6 +916,11 @@ zipt_impl! { [A B C D E F][ a b c d e f], } +macro_rules! last_of { + ($q:ty) => { $q }; + ($p:ty, $($q:ty),+) => { last_of!($($q),+) }; +} + macro_rules! map_impl { ($([$notlast:ident $($p:ident)*],)+) => { $( @@ -1012,6 +1017,14 @@ macro_rules! map_impl { }).is_done() } + #[cfg(feature = "rayon")] + #[allow(dead_code)] // unused for the first of the Zip arities + /// Return a reference to the last producer + pub(crate) fn last_producer(&self) -> &last_of!($($p),*) { + let (.., ref last) = &self.parts; + last + } + expand_if!(@bool [$notlast] /// Include the producer `p` in the Zip. From 42772962313e550904b027fa233ceda45f1a3214 Mon Sep 17 00:00:00 2001 From: bluss Date: Mon, 11 May 2020 23:31:18 +0200 Subject: [PATCH 09/15] FEAT: Implement generic parallel collect Allow non-copy elements by implementing dropping partial results from collect (needed if there is a panic with unwinding during the apply-collect process). It is implemented by: 1. allocate an uninit output array of the right size and layout 2. use parallelsplits to split the Zip into chunks processed in parallel 3. for each chunk keep track of the slice of written elements 4. each output chunk is contiguous due to the layout being picked to match the Zip's preferred layout 5. Use reduce to merge adjacent partial results; this ensures we drop all the rests correctly, if there is a panic in any thread --- src/parallel/impl_par_methods.rs | 126 +++++++++++++++++++++++++++++-- src/parallel/mod.rs | 1 + src/parallel/send_producer.rs | 83 ++++++++++++++++++++ 3 files changed, 204 insertions(+), 6 deletions(-) create mode 100644 src/parallel/send_producer.rs diff --git a/src/parallel/impl_par_methods.rs b/src/parallel/impl_par_methods.rs index 867cd07ad..c3292876f 100644 --- a/src/parallel/impl_par_methods.rs +++ b/src/parallel/impl_par_methods.rs @@ -2,6 +2,8 @@ use crate::{Array, ArrayBase, DataMut, Dimension, IntoNdProducer, NdProducer, Zi use crate::AssignElem; use crate::parallel::prelude::*; +use crate::parallel::par::ParallelSplits; +use super::send_producer::SendProducer; /// # Parallel methods /// @@ -43,6 +45,8 @@ where // Zip +const COLLECT_MAX_PARTS: usize = 256; + macro_rules! zip_impl { ($([$notlast:ident $($p:ident)*],)+) => { $( @@ -71,14 +75,56 @@ macro_rules! zip_impl { /// inputs. /// /// If all inputs are c- or f-order respectively, that is preserved in the output. - /// - /// Restricted to functions that produce copyable results for technical reasons; other - /// cases are not yet implemented. - pub fn par_apply_collect(self, f: impl Fn($($p::Item,)* ) -> R + Sync + Send) -> Array - where R: Copy + Send + pub fn par_apply_collect(self, f: impl Fn($($p::Item,)* ) -> R + Sync + Send) + -> Array + where R: Send { let mut output = self.uninitalized_for_current_layout::(); - self.par_apply_assign_into(&mut output, f); + let total_len = output.len(); + + // Create a parallel iterator that produces chunks of the zip with the output + // array. It's crucial that both parts split in the same way, and in a way + // so that the chunks of the output are still contig. + // + // Use a raw view so that we can alias the output data here and in the partial + // result. + let splits = unsafe { + ParallelSplits { + iter: self.and(SendProducer::new(output.raw_view_mut().cast::())), + // Keep it from splitting the Zip down too small + min_size: total_len / COLLECT_MAX_PARTS, + } + }; + + let collect_result = splits.map(move |zip| { + // Create a partial result for the contiguous slice of data being written to + let output = zip.last_producer(); + debug_assert!(output.is_contiguous()); + + let mut partial = Partial::new(output.as_ptr()); + + // Apply the mapping function on this chunk of the zip + let partial_len = &mut partial.len; + let f = &f; + zip.apply(move |$($p,)* output_elem: *mut R| unsafe { + output_elem.write(f($($p),*)); + if std::mem::needs_drop::() { + *partial_len += 1; + } + }); + + partial + }) + .reduce(Partial::stub, Partial::try_merge); + + if std::mem::needs_drop::() { + debug_assert_eq!(total_len, collect_result.len, "collect len is not correct, expected {}", total_len); + assert!(collect_result.len == total_len, "Collect: Expected number of writes not completed"); + } + + // Here the collect result is complete, and we release its ownership and transfer + // it to the output array. + collect_result.release_ownership(); unsafe { output.assume_init() } @@ -113,3 +159,71 @@ zip_impl! { [true P1 P2 P3 P4 P5], [false P1 P2 P3 P4 P5 P6], } + +/// Partial is a partially written contiguous slice of data; +/// it is the owner of the elements, but not the allocation, +/// and will drop the elements on drop. +#[must_use] +pub(crate) struct Partial { + /// Data pointer + ptr: *mut T, + /// Current length + len: usize, +} + +impl Partial { + /// Create an empty partial for this data pointer + pub(crate) fn new(ptr: *mut T) -> Self { + Self { + ptr, + len: 0, + } + } + + pub(crate) fn stub() -> Self { + Self { len: 0, ptr: 0 as *mut _ } + } + + pub(crate) fn is_stub(&self) -> bool { + self.ptr.is_null() + } + + /// Release Partial's ownership of the written elements, and return the current length + pub(crate) fn release_ownership(mut self) -> usize { + let ret = self.len; + self.len = 0; + ret + } + + /// Merge if they are in order (left to right) and contiguous. + /// Skips merge if T does not need drop. + pub(crate) fn try_merge(mut left: Self, right: Self) -> Self { + if !std::mem::needs_drop::() { + return left; + } + // Merge the partial collect results; the final result will be a slice that + // covers the whole output. + if left.is_stub() { + right + } else if left.ptr.wrapping_add(left.len) == right.ptr { + left.len += right.release_ownership(); + left + } else { + // failure to merge; this is a bug in collect, so we will never reach this + debug_assert!(false, "Partial: failure to merge left and right parts"); + left + } + } +} + +unsafe impl Send for Partial where T: Send { } + +impl Drop for Partial { + fn drop(&mut self) { + if !self.ptr.is_null() { + unsafe { + std::ptr::drop_in_place(std::slice::from_raw_parts_mut(self.ptr, self.len)); + } + } + } +} diff --git a/src/parallel/mod.rs b/src/parallel/mod.rs index 82cd1cba6..12059cee4 100644 --- a/src/parallel/mod.rs +++ b/src/parallel/mod.rs @@ -155,4 +155,5 @@ pub use crate::par_azip; mod impl_par_methods; mod into_impls; mod par; +mod send_producer; mod zipmacro; diff --git a/src/parallel/send_producer.rs b/src/parallel/send_producer.rs new file mode 100644 index 000000000..5324b3490 --- /dev/null +++ b/src/parallel/send_producer.rs @@ -0,0 +1,83 @@ + +use crate::imp_prelude::*; +use crate::{Layout, NdProducer}; +use std::ops::{Deref, DerefMut}; + +/// An NdProducer that is unconditionally `Send`. +#[repr(transparent)] +pub(crate) struct SendProducer { + inner: T +} + +impl SendProducer { + /// Create an unconditionally `Send` ndproducer from the producer + pub(crate) unsafe fn new(producer: T) -> Self { Self { inner: producer } } +} + +unsafe impl

Send for SendProducer

{ } + +impl

Deref for SendProducer

{ + type Target = P; + fn deref(&self) -> &P { &self.inner } +} + +impl

DerefMut for SendProducer

{ + fn deref_mut(&mut self) -> &mut P { &mut self.inner } +} + +impl NdProducer for SendProducer

+ where P: NdProducer, +{ + type Item = P::Item; + type Dim = P::Dim; + type Ptr = P::Ptr; + type Stride = P::Stride; + + private_impl! {} + + #[inline(always)] + fn raw_dim(&self) -> Self::Dim { + self.inner.raw_dim() + } + + #[inline(always)] + fn equal_dim(&self, dim: &Self::Dim) -> bool { + self.inner.equal_dim(dim) + } + + #[inline(always)] + fn as_ptr(&self) -> Self::Ptr { + self.inner.as_ptr() + } + + #[inline(always)] + fn layout(&self) -> Layout { + self.inner.layout() + } + + #[inline(always)] + unsafe fn as_ref(&self, ptr: Self::Ptr) -> Self::Item { + self.inner.as_ref(ptr) + } + + #[inline(always)] + unsafe fn uget_ptr(&self, i: &Self::Dim) -> Self::Ptr { + self.inner.uget_ptr(i) + } + + #[inline(always)] + fn stride_of(&self, axis: Axis) -> Self::Stride { + self.inner.stride_of(axis) + } + + #[inline(always)] + fn contiguous_stride(&self) -> Self::Stride { + self.inner.contiguous_stride() + } + + fn split_at(self, axis: Axis, index: usize) -> (Self, Self) { + let (a, b) = self.inner.split_at(axis, index); + (Self { inner: a }, Self { inner: b }) + } +} + From 6d43c133db81f7c32bfd2b20a9ea9d92513d07cf Mon Sep 17 00:00:00 2001 From: bluss Date: Mon, 18 May 2020 20:18:00 +0200 Subject: [PATCH 10/15] FIX: In ParallelSplits, count maximum number of splits Instead of requiring to use the size in elements of the thing-to-split, simply use a counter for the number of splits. --- src/parallel/impl_par_methods.rs | 4 ++-- src/parallel/par.rs | 8 ++++---- src/split_at.rs | 1 - src/zip/mod.rs | 2 -- 4 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/parallel/impl_par_methods.rs b/src/parallel/impl_par_methods.rs index c3292876f..f73b952d5 100644 --- a/src/parallel/impl_par_methods.rs +++ b/src/parallel/impl_par_methods.rs @@ -45,7 +45,7 @@ where // Zip -const COLLECT_MAX_PARTS: usize = 256; +const COLLECT_MAX_SPLITS: usize = 10; macro_rules! zip_impl { ($([$notlast:ident $($p:ident)*],)+) => { @@ -92,7 +92,7 @@ macro_rules! zip_impl { ParallelSplits { iter: self.and(SendProducer::new(output.raw_view_mut().cast::())), // Keep it from splitting the Zip down too small - min_size: total_len / COLLECT_MAX_PARTS, + max_splits: COLLECT_MAX_SPLITS, } }; diff --git a/src/parallel/par.rs b/src/parallel/par.rs index c7a0fcb3a..d9d592af6 100644 --- a/src/parallel/par.rs +++ b/src/parallel/par.rs @@ -288,7 +288,7 @@ zip_impl! { /// or producer `P`. pub(crate) struct ParallelSplits

{ pub(crate) iter: P, - pub(crate) min_size: usize, + pub(crate) max_splits: usize, } impl

ParallelIterator for ParallelSplits

@@ -313,17 +313,17 @@ impl

UnindexedProducer for ParallelSplits

type Item = P; fn split(self) -> (Self, Option) { - if self.iter.size() <= self.min_size || !self.iter.can_split() { + if self.max_splits == 0 || !self.iter.can_split() { return (self, None) } let (a, b) = self.iter.split(); (ParallelSplits { iter: a, - min_size: self.min_size, + max_splits: self.max_splits - 1, }, Some(ParallelSplits { iter: b, - min_size: self.min_size, + max_splits: self.max_splits - 1, })) } diff --git a/src/split_at.rs b/src/split_at.rs index da15dfcf3..b05e58346 100644 --- a/src/split_at.rs +++ b/src/split_at.rs @@ -8,7 +8,6 @@ pub(crate) trait SplitAt { pub(crate) trait SplitPreference : SplitAt { fn can_split(&self) -> bool; - fn size(&self) -> usize; fn split_preference(&self) -> (Axis, usize); fn split(self) -> (Self, Self) where Self: Sized { let (axis, index) = self.split_preference(); diff --git a/src/zip/mod.rs b/src/zip/mod.rs index af251c235..9196a2663 100644 --- a/src/zip/mod.rs +++ b/src/zip/mod.rs @@ -1126,8 +1126,6 @@ macro_rules! map_impl { { fn can_split(&self) -> bool { self.size() > 1 } - fn size(&self) -> usize { self.size() } - fn split_preference(&self) -> (Axis, usize) { // Always split in a way that preserves layout (if any) let axis = self.max_stride_axis(); From e3ebf8c545ba7e5d59ef0da04ae8c909a90b1166 Mon Sep 17 00:00:00 2001 From: bluss Date: Mon, 18 May 2020 20:19:06 +0200 Subject: [PATCH 11/15] FIX: In Zip, fix unused code warning for `last_of` macro --- src/zip/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/zip/mod.rs b/src/zip/mod.rs index 9196a2663..57c0e2dc5 100644 --- a/src/zip/mod.rs +++ b/src/zip/mod.rs @@ -916,6 +916,7 @@ zipt_impl! { [A B C D E F][ a b c d e f], } +#[cfg(feature = "rayon")] macro_rules! last_of { ($q:ty) => { $q }; ($p:ty, $($q:ty),+) => { last_of!($($q),+) }; From efcd6074324c18ef21cd2a0ce10c89b2732aef13 Mon Sep 17 00:00:00 2001 From: bluss Date: Tue, 19 May 2020 21:21:15 +0200 Subject: [PATCH 12/15] FIX: Make Partial::new an unsafe method --- src/parallel/impl_par_methods.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/parallel/impl_par_methods.rs b/src/parallel/impl_par_methods.rs index f73b952d5..0ed48cad1 100644 --- a/src/parallel/impl_par_methods.rs +++ b/src/parallel/impl_par_methods.rs @@ -100,8 +100,10 @@ macro_rules! zip_impl { // Create a partial result for the contiguous slice of data being written to let output = zip.last_producer(); debug_assert!(output.is_contiguous()); - - let mut partial = Partial::new(output.as_ptr()); + let mut partial; + unsafe { + partial = Partial::new(output.as_ptr()); + } // Apply the mapping function on this chunk of the zip let partial_len = &mut partial.len; @@ -173,7 +175,12 @@ pub(crate) struct Partial { impl Partial { /// Create an empty partial for this data pointer - pub(crate) fn new(ptr: *mut T) -> Self { + /// + /// Safety: Unless ownership is released, the + /// Partial acts as an owner of the slice of data (not the allocation); + /// and will free the elements on drop; the pointer must be dereferenceable + /// and the `len` elements following it valid. + pub(crate) unsafe fn new(ptr: *mut T) -> Self { Self { ptr, len: 0, From 8ed9ac331ebf11f85d18ed963b297ff033bf35c8 Mon Sep 17 00:00:00 2001 From: bluss Date: Tue, 19 May 2020 21:21:32 +0200 Subject: [PATCH 13/15] FIX: Wrap long lines in impl_par_methods.rs --- src/parallel/impl_par_methods.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/parallel/impl_par_methods.rs b/src/parallel/impl_par_methods.rs index 0ed48cad1..0aedee9e8 100644 --- a/src/parallel/impl_par_methods.rs +++ b/src/parallel/impl_par_methods.rs @@ -120,8 +120,10 @@ macro_rules! zip_impl { .reduce(Partial::stub, Partial::try_merge); if std::mem::needs_drop::() { - debug_assert_eq!(total_len, collect_result.len, "collect len is not correct, expected {}", total_len); - assert!(collect_result.len == total_len, "Collect: Expected number of writes not completed"); + debug_assert_eq!(total_len, collect_result.len, + "collect len is not correct, expected {}", total_len); + assert!(collect_result.len == total_len, + "Collect: Expected number of writes not completed"); } // Here the collect result is complete, and we release its ownership and transfer From d02b757aca316f96902ebd63ed4d0a71a51acfb3 Mon Sep 17 00:00:00 2001 From: bluss Date: Sat, 23 May 2020 20:04:55 +0200 Subject: [PATCH 14/15] FIX: Use Partial instead of PartialArray Partial is just a contiguous slice, and much simpler than PartialArray; Partial is all that's needed, because the area written will always be contiguous. --- src/lib.rs | 1 + src/parallel/impl_par_methods.rs | 75 +--------------- src/partial.rs | 88 +++++++++++++++++++ src/zip/mod.rs | 21 +++-- src/zip/partial_array.rs | 144 ------------------------------- 5 files changed, 104 insertions(+), 225 deletions(-) create mode 100644 src/partial.rs delete mode 100644 src/zip/partial_array.rs diff --git a/src/lib.rs b/src/lib.rs index b40eee01a..f65e82ec9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -176,6 +176,7 @@ mod linalg_traits; mod linspace; mod logspace; mod numeric_util; +mod partial; mod shape_builder; #[macro_use] mod slice; diff --git a/src/parallel/impl_par_methods.rs b/src/parallel/impl_par_methods.rs index 0aedee9e8..e5470e170 100644 --- a/src/parallel/impl_par_methods.rs +++ b/src/parallel/impl_par_methods.rs @@ -5,6 +5,8 @@ use crate::parallel::prelude::*; use crate::parallel::par::ParallelSplits; use super::send_producer::SendProducer; +use crate::partial::Partial; + /// # Parallel methods /// /// These methods require crate feature `rayon`. @@ -163,76 +165,3 @@ zip_impl! { [true P1 P2 P3 P4 P5], [false P1 P2 P3 P4 P5 P6], } - -/// Partial is a partially written contiguous slice of data; -/// it is the owner of the elements, but not the allocation, -/// and will drop the elements on drop. -#[must_use] -pub(crate) struct Partial { - /// Data pointer - ptr: *mut T, - /// Current length - len: usize, -} - -impl Partial { - /// Create an empty partial for this data pointer - /// - /// Safety: Unless ownership is released, the - /// Partial acts as an owner of the slice of data (not the allocation); - /// and will free the elements on drop; the pointer must be dereferenceable - /// and the `len` elements following it valid. - pub(crate) unsafe fn new(ptr: *mut T) -> Self { - Self { - ptr, - len: 0, - } - } - - pub(crate) fn stub() -> Self { - Self { len: 0, ptr: 0 as *mut _ } - } - - pub(crate) fn is_stub(&self) -> bool { - self.ptr.is_null() - } - - /// Release Partial's ownership of the written elements, and return the current length - pub(crate) fn release_ownership(mut self) -> usize { - let ret = self.len; - self.len = 0; - ret - } - - /// Merge if they are in order (left to right) and contiguous. - /// Skips merge if T does not need drop. - pub(crate) fn try_merge(mut left: Self, right: Self) -> Self { - if !std::mem::needs_drop::() { - return left; - } - // Merge the partial collect results; the final result will be a slice that - // covers the whole output. - if left.is_stub() { - right - } else if left.ptr.wrapping_add(left.len) == right.ptr { - left.len += right.release_ownership(); - left - } else { - // failure to merge; this is a bug in collect, so we will never reach this - debug_assert!(false, "Partial: failure to merge left and right parts"); - left - } - } -} - -unsafe impl Send for Partial where T: Send { } - -impl Drop for Partial { - fn drop(&mut self) { - if !self.ptr.is_null() { - unsafe { - std::ptr::drop_in_place(std::slice::from_raw_parts_mut(self.ptr, self.len)); - } - } - } -} diff --git a/src/partial.rs b/src/partial.rs new file mode 100644 index 000000000..887e93824 --- /dev/null +++ b/src/partial.rs @@ -0,0 +1,88 @@ +// Copyright 2020 bluss and ndarray developers. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::ptr; + +/// Partial is a partially written contiguous slice of data; +/// it is the owner of the elements, but not the allocation, +/// and will drop the elements on drop. +#[must_use] +pub(crate) struct Partial { + /// Data pointer + ptr: *mut T, + /// Current length + pub(crate) len: usize, +} + +impl Partial { + /// Create an empty partial for this data pointer + /// + /// ## Safety + /// + /// Unless ownership is released, the Partial acts as an owner of the slice of data (not the + /// allocation); and will free the elements on drop; the pointer must be dereferenceable and + /// the `len` elements following it valid. + /// + /// The Partial has an accessible length field which must only be modified in trusted code. + pub(crate) unsafe fn new(ptr: *mut T) -> Self { + Self { + ptr, + len: 0, + } + } + + #[cfg(feature = "rayon")] + pub(crate) fn stub() -> Self { + Self { len: 0, ptr: 0 as *mut _ } + } + + #[cfg(feature = "rayon")] + pub(crate) fn is_stub(&self) -> bool { + self.ptr.is_null() + } + + /// Release Partial's ownership of the written elements, and return the current length + pub(crate) fn release_ownership(mut self) -> usize { + let ret = self.len; + self.len = 0; + ret + } + + #[cfg(feature = "rayon")] + /// Merge if they are in order (left to right) and contiguous. + /// Skips merge if T does not need drop. + pub(crate) fn try_merge(mut left: Self, right: Self) -> Self { + if !std::mem::needs_drop::() { + return left; + } + // Merge the partial collect results; the final result will be a slice that + // covers the whole output. + if left.is_stub() { + right + } else if left.ptr.wrapping_add(left.len) == right.ptr { + left.len += right.release_ownership(); + left + } else { + // failure to merge; this is a bug in collect, so we will never reach this + debug_assert!(false, "Partial: failure to merge left and right parts"); + left + } + } +} + +unsafe impl Send for Partial where T: Send { } + +impl Drop for Partial { + fn drop(&mut self) { + if !self.ptr.is_null() { + unsafe { + ptr::drop_in_place(std::slice::from_raw_parts_mut(self.ptr, self.len)); + } + } + } +} diff --git a/src/zip/mod.rs b/src/zip/mod.rs index 57c0e2dc5..f894390e6 100644 --- a/src/zip/mod.rs +++ b/src/zip/mod.rs @@ -8,7 +8,6 @@ #[macro_use] mod zipmacro; -mod partial_array; use std::mem::MaybeUninit; @@ -17,13 +16,12 @@ use crate::AssignElem; use crate::IntoDimension; use crate::Layout; use crate::NdIndex; +use crate::partial::Partial; use crate::indexes::{indices, Indices}; use crate::layout::{CORDER, FORDER}; use crate::split_at::{SplitPreference, SplitAt}; -use partial_array::PartialArray; - /// Return if the expression is a break value. macro_rules! fold_while { ($e:expr) => { @@ -1070,7 +1068,7 @@ macro_rules! map_impl { /// inputs. /// /// If all inputs are c- or f-order respectively, that is preserved in the output. - pub fn apply_collect(self, f: impl FnMut($($p::Item,)* ) -> R) -> Array + pub fn apply_collect(self, mut f: impl FnMut($($p::Item,)* ) -> R) -> Array { // Make uninit result let mut output = self.uninitalized_for_current_layout::(); @@ -1078,13 +1076,20 @@ macro_rules! map_impl { // For elements with no drop glue, just overwrite into the array self.apply_assign_into(&mut output, f); } else { - // For generic elements, use a proxy that counts the number of filled elements, + // For generic elements, use a Partial to counts the number of filled elements, // and can drop the right number of elements on unwinding unsafe { - PartialArray::scope(output.view_mut(), move |partial| { - debug_assert_eq!(partial.layout().tendency() >= 0, self.layout_tendency >= 0); - self.apply_assign_into(partial, f); + let mut output = output.raw_view_mut().cast::(); + let mut partial = Partial::new(output.as_mut_ptr()); + let partial_ref = &mut partial; + debug_assert!(output.is_contiguous()); + debug_assert_eq!(output.layout().tendency() >= 0, self.layout_tendency >= 0); + self.and(output) + .apply(move |$($p, )* output_: *mut R| { + output_.write(f($($p ),*)); + partial_ref.len += 1; }); + partial.release_ownership(); } } diff --git a/src/zip/partial_array.rs b/src/zip/partial_array.rs deleted file mode 100644 index d6a2bb9cd..000000000 --- a/src/zip/partial_array.rs +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright 2020 bluss and ndarray developers. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use crate::imp_prelude::*; -use crate::{ - AssignElem, - Layout, - NdProducer, - Zip, - FoldWhile, -}; - -use std::cell::Cell; -use std::mem; -use std::mem::MaybeUninit; -use std::ptr; - -/// An assignable element reference that increments a counter when assigned -pub(crate) struct ProxyElem<'a, 'b, A> { - item: &'a mut MaybeUninit, - filled: &'b Cell -} - -impl<'a, 'b, A> AssignElem for ProxyElem<'a, 'b, A> { - fn assign_elem(self, item: A) { - self.filled.set(self.filled.get() + 1); - *self.item = MaybeUninit::new(item); - } -} - -/// Handles progress of assigning to a part of an array, for elements that need -/// to be dropped on unwinding. See Self::scope. -pub(crate) struct PartialArray<'a, 'b, A, D> - where D: Dimension -{ - data: ArrayViewMut<'a, MaybeUninit, D>, - filled: &'b Cell, -} - -impl<'a, 'b, A, D> PartialArray<'a, 'b, A, D> - where D: Dimension -{ - /// Create a temporary PartialArray that wraps the array view `data`; - /// if the end of the scope is reached, the partial array is marked complete; - /// if execution unwinds at any time before them, the elements written until then - /// are dropped. - /// - /// Safety: the caller *must* ensure that elements will be written in `data`'s preferred order. - /// PartialArray can not handle arbitrary writes, only in the memory order. - pub(crate) unsafe fn scope(data: ArrayViewMut<'a, MaybeUninit, D>, - scope_fn: impl FnOnce(&mut PartialArray)) - { - let filled = Cell::new(0); - let mut partial = PartialArray::new(data, &filled); - scope_fn(&mut partial); - filled.set(0); // mark complete - } - - unsafe fn new(data: ArrayViewMut<'a, MaybeUninit, D>, - filled: &'b Cell) -> Self - { - debug_assert_eq!(filled.get(), 0); - Self { data, filled } - } -} - -impl<'a, 'b, A, D> Drop for PartialArray<'a, 'b, A, D> - where D: Dimension -{ - fn drop(&mut self) { - if !mem::needs_drop::() { - return; - } - - let mut count = self.filled.get(); - if count == 0 { - return; - } - - Zip::from(self).fold_while((), move |(), elt| { - if count > 0 { - count -= 1; - unsafe { - ptr::drop_in_place::(elt.item.as_mut_ptr()); - } - FoldWhile::Continue(()) - } else { - FoldWhile::Done(()) - } - }); - } -} - -impl<'a: 'c, 'b: 'c, 'c, A, D: Dimension> NdProducer for &'c mut PartialArray<'a, 'b, A, D> { - // This just wraps ArrayViewMut as NdProducer and maps the item - type Item = ProxyElem<'a, 'b, A>; - type Dim = D; - type Ptr = *mut MaybeUninit; - type Stride = isize; - - private_impl! {} - fn raw_dim(&self) -> Self::Dim { - self.data.raw_dim() - } - - fn equal_dim(&self, dim: &Self::Dim) -> bool { - self.data.equal_dim(dim) - } - - fn as_ptr(&self) -> Self::Ptr { - NdProducer::as_ptr(&self.data) - } - - fn layout(&self) -> Layout { - self.data.layout() - } - - unsafe fn as_ref(&self, ptr: Self::Ptr) -> Self::Item { - ProxyElem { filled: self.filled, item: &mut *ptr } - } - - unsafe fn uget_ptr(&self, i: &Self::Dim) -> Self::Ptr { - self.data.uget_ptr(i) - } - - fn stride_of(&self, axis: Axis) -> Self::Stride { - self.data.stride_of(axis) - } - - #[inline(always)] - fn contiguous_stride(&self) -> Self::Stride { - self.data.contiguous_stride() - } - - fn split_at(self, _axis: Axis, _index: usize) -> (Self, Self) { - unimplemented!(); - } -} - From e47261294d79d40b21c4690a457e5b2c7dc4049b Mon Sep 17 00:00:00 2001 From: bluss Date: Sat, 23 May 2020 22:15:56 +0200 Subject: [PATCH 15/15] FEAT: Combine common parts of apply_collect and par_apply_collect Factor out the common part of the parallel and and regular apply_collect implementation; the non-parallel part came first and ended up more complicated originally. With the parallel version in place, both can use the same main operation which is implemented in the methods Zip::collect_with_partial. --- src/parallel/impl_par_methods.rs | 18 +------ src/zip/mod.rs | 91 ++++++++++++++++++++------------ 2 files changed, 58 insertions(+), 51 deletions(-) diff --git a/src/parallel/impl_par_methods.rs b/src/parallel/impl_par_methods.rs index e5470e170..a4efa560f 100644 --- a/src/parallel/impl_par_methods.rs +++ b/src/parallel/impl_par_methods.rs @@ -99,25 +99,11 @@ macro_rules! zip_impl { }; let collect_result = splits.map(move |zip| { + // Apply the mapping function on this chunk of the zip // Create a partial result for the contiguous slice of data being written to - let output = zip.last_producer(); - debug_assert!(output.is_contiguous()); - let mut partial; unsafe { - partial = Partial::new(output.as_ptr()); + zip.collect_with_partial(&f) } - - // Apply the mapping function on this chunk of the zip - let partial_len = &mut partial.len; - let f = &f; - zip.apply(move |$($p,)* output_elem: *mut R| unsafe { - output_elem.write(f($($p),*)); - if std::mem::needs_drop::() { - *partial_len += 1; - } - }); - - partial }) .reduce(Partial::stub, Partial::try_merge); diff --git a/src/zip/mod.rs b/src/zip/mod.rs index f894390e6..8ce5e9869 100644 --- a/src/zip/mod.rs +++ b/src/zip/mod.rs @@ -914,12 +914,6 @@ zipt_impl! { [A B C D E F][ a b c d e f], } -#[cfg(feature = "rayon")] -macro_rules! last_of { - ($q:ty) => { $q }; - ($p:ty, $($q:ty),+) => { last_of!($($q),+) }; -} - macro_rules! map_impl { ($([$notlast:ident $($p:ident)*],)+) => { $( @@ -1016,14 +1010,6 @@ macro_rules! map_impl { }).is_done() } - #[cfg(feature = "rayon")] - #[allow(dead_code)] // unused for the first of the Zip arities - /// Return a reference to the last producer - pub(crate) fn last_producer(&self) -> &last_of!($($p),*) { - let (.., ref last) = &self.parts; - last - } - expand_if!(@bool [$notlast] /// Include the producer `p` in the Zip. @@ -1068,32 +1054,19 @@ macro_rules! map_impl { /// inputs. /// /// If all inputs are c- or f-order respectively, that is preserved in the output. - pub fn apply_collect(self, mut f: impl FnMut($($p::Item,)* ) -> R) -> Array + pub fn apply_collect(self, f: impl FnMut($($p::Item,)* ) -> R) -> Array { // Make uninit result let mut output = self.uninitalized_for_current_layout::(); - if !std::mem::needs_drop::() { - // For elements with no drop glue, just overwrite into the array - self.apply_assign_into(&mut output, f); - } else { - // For generic elements, use a Partial to counts the number of filled elements, - // and can drop the right number of elements on unwinding - unsafe { - let mut output = output.raw_view_mut().cast::(); - let mut partial = Partial::new(output.as_mut_ptr()); - let partial_ref = &mut partial; - debug_assert!(output.is_contiguous()); - debug_assert_eq!(output.layout().tendency() >= 0, self.layout_tendency >= 0); - self.and(output) - .apply(move |$($p, )* output_: *mut R| { - output_.write(f($($p ),*)); - partial_ref.len += 1; - }); - partial.release_ownership(); - } - } + // Use partial to counts the number of filled elements, and can drop the right + // number of elements on unwinding (if it happens during apply/collect). unsafe { + let output_view = output.raw_view_mut().cast::(); + self.and(output_view) + .collect_with_partial(f) + .release_ownership(); + output.assume_init() } } @@ -1126,6 +1099,54 @@ macro_rules! map_impl { } } + expand_if!(@bool [$notlast] + // For collect; Last producer is a RawViewMut + #[allow(non_snake_case)] + impl Zip<($($p,)* PLast), D> + where D: Dimension, + $($p: NdProducer ,)* + PLast: NdProducer, + { + /// The inner workings of apply_collect and par_apply_collect + /// + /// Apply the function and collect the results into the output (last producer) + /// which should be a raw array view; a Partial that owns the written + /// elements is returned. + /// + /// Elements will be overwritten in place (in the sense of std::ptr::write). + /// + /// ## Safety + /// + /// The last producer is a RawArrayViewMut and must be safe to write into. + /// The producer must be c- or f-contig and have the same layout tendency + /// as the whole Zip. + /// + /// The returned Partial's proxy ownership of the elements must be handled, + /// before the array the raw view points to realizes its ownership. + pub(crate) unsafe fn collect_with_partial(self, mut f: F) -> Partial + where F: FnMut($($p::Item,)* ) -> R + { + // Get the last producer; and make a Partial that aliases its data pointer + let (.., ref output) = &self.parts; + debug_assert!(output.layout().is(CORDER | FORDER)); + debug_assert_eq!(output.layout().tendency() >= 0, self.layout_tendency >= 0); + let mut partial = Partial::new(output.as_ptr()); + + // Apply the mapping function on this zip + // if we panic with unwinding; Partial will drop the written elements. + let partial_len = &mut partial.len; + self.apply(move |$($p,)* output_elem: *mut R| { + output_elem.write(f($($p),*)); + if std::mem::needs_drop::() { + *partial_len += 1; + } + }); + + partial + } + } + ); + impl SplitPreference for Zip<($($p,)*), D> where D: Dimension, $($p: NdProducer ,)*