Skip to content

Commit

Permalink
Specialize interleave string ~2-3x faster (#2944)
Browse files Browse the repository at this point in the history
* Add interleave string benchmark

* Specialize interleave strings (#2864)

* Review feedback
  • Loading branch information
tustvold committed Oct 27, 2022
1 parent d625f0a commit 66ea66b
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 52 deletions.
10 changes: 4 additions & 6 deletions arrow-array/src/array/string_array.rs
Expand Up @@ -90,8 +90,8 @@ impl<OffsetSize: OffsetSizeTrait> GenericStringArray<OffsetSize> {
/// caller is responsible for ensuring that index is within the array bounds
#[inline]
pub unsafe fn value_unchecked(&self, i: usize) -> &str {
let end = self.value_offsets().get_unchecked(i + 1);
let start = self.value_offsets().get_unchecked(i);
let end = self.value_offsets().get_unchecked(i + 1).as_usize();
let start = self.value_offsets().get_unchecked(i).as_usize();

// Soundness
// pointer alignment & location is ensured by RawPtrBox
Expand All @@ -103,10 +103,8 @@ impl<OffsetSize: OffsetSizeTrait> GenericStringArray<OffsetSize> {
// OffsetSizeTrait. Currently, only i32 and i64 implement OffsetSizeTrait,
// both of which should cleanly cast to isize on an architecture that supports
// 32/64-bit offsets
let slice = std::slice::from_raw_parts(
self.value_data.as_ptr().offset(start.to_isize().unwrap()),
(*end - *start).to_usize().unwrap(),
);
let slice =
std::slice::from_raw_parts(self.value_data.as_ptr().add(start), end - start);
std::str::from_utf8_unchecked(slice)
}

Expand Down
107 changes: 83 additions & 24 deletions arrow-select/src/interleave.rs
Expand Up @@ -16,11 +16,11 @@
// under the License.

use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder};
use arrow_array::cast::as_primitive_array;
use arrow_array::{
downcast_primitive, make_array, new_empty_array, Array, ArrayRef, ArrowPrimitiveType,
PrimitiveArray,
GenericStringArray, OffsetSizeTrait, PrimitiveArray,
};
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_data::transform::MutableArrayData;
use arrow_data::ArrayDataBuilder;
use arrow_schema::{ArrowError, DataType};
Expand Down Expand Up @@ -85,51 +85,110 @@ pub fn interleave(

downcast_primitive! {
data_type => (primitive_helper, values, indices, data_type),
DataType::Utf8 => interleave_string::<i32>(values, indices, data_type),
DataType::LargeUtf8 => interleave_string::<i64>(values, indices, data_type),
_ => interleave_fallback(values, indices)
}
}

/// Common functionality for interleaving arrays
///
/// T is the concrete Array type
struct Interleave<'a, T> {
/// The input arrays downcast to T
arrays: Vec<&'a T>,
/// The number of nulls in the interleaved output
null_count: usize,
/// The null buffer of the interleaved output
nulls: Option<Buffer>,
}

impl<'a, T: Array + 'static> Interleave<'a, T> {
fn new(values: &[&'a dyn Array], indices: &'a [(usize, usize)]) -> Self {
let mut has_nulls = false;
let arrays: Vec<&T> = values
.iter()
.map(|x| {
has_nulls = has_nulls || x.null_count() != 0;
x.as_any().downcast_ref().unwrap()
})
.collect();

let mut null_count = 0;
let nulls = has_nulls.then(|| {
let mut builder = BooleanBufferBuilder::new(indices.len());
for (a, b) in indices {
let v = arrays[*a].is_valid(*b);
null_count += !v as usize;
builder.append(v)
}
builder.finish()
});

Self {
arrays,
null_count,
nulls,
}
}
}

fn interleave_primitive<T: ArrowPrimitiveType>(
values: &[&dyn Array],
indices: &[(usize, usize)],
data_type: &DataType,
) -> Result<ArrayRef, ArrowError> {
let mut has_nulls = false;
let cast: Vec<_> = values
.iter()
.map(|x| {
has_nulls = has_nulls || x.null_count() != 0;
as_primitive_array::<T>(*x)
})
.collect();
let interleaved = Interleave::<'_, PrimitiveArray<T>>::new(values, indices);

let mut values = BufferBuilder::<T::Native>::new(indices.len());
for (a, b) in indices {
let v = cast[*a].value(*b);
let v = interleaved.arrays[*a].value(*b);
values.append(v)
}

let mut null_count = 0;
let nulls = has_nulls.then(|| {
let mut builder = BooleanBufferBuilder::new(indices.len());
for (a, b) in indices {
let v = cast[*a].is_valid(*b);
null_count += !v as usize;
builder.append(v)
}
builder.finish()
});

let builder = ArrayDataBuilder::new(data_type.clone())
.len(indices.len())
.add_buffer(values.finish())
.null_bit_buffer(nulls)
.null_count(null_count);
.null_bit_buffer(interleaved.nulls)
.null_count(interleaved.null_count);

let data = unsafe { builder.build_unchecked() };
Ok(Arc::new(PrimitiveArray::<T>::from(data)))
}

fn interleave_string<O: OffsetSizeTrait>(
values: &[&dyn Array],
indices: &[(usize, usize)],
data_type: &DataType,
) -> Result<ArrayRef, ArrowError> {
let interleaved = Interleave::<'_, GenericStringArray<O>>::new(values, indices);

let mut capacity = 0;
let mut offsets = BufferBuilder::<O>::new(indices.len() + 1);
offsets.append(O::from_usize(0).unwrap());
for (a, b) in indices {
let o = interleaved.arrays[*a].value_offsets();
let element_len = o[*b + 1].as_usize() - o[*b].as_usize();
capacity += element_len;
offsets.append(O::from_usize(capacity).expect("overflow"));
}

let mut values = MutableBuffer::new(capacity);
for (a, b) in indices {
values.extend_from_slice(interleaved.arrays[*a].value(*b).as_bytes());
}

let builder = ArrayDataBuilder::new(data_type.clone())
.len(indices.len())
.add_buffer(offsets.finish())
.add_buffer(values.into())
.null_bit_buffer(interleaved.nulls)
.null_count(interleaved.null_count);

let data = unsafe { builder.build_unchecked() };
Ok(Arc::new(GenericStringArray::<O>::from(data)))
}

/// Fallback implementation of interleave using [`MutableArrayData`]
fn interleave_fallback(
values: &[&dyn Array],
Expand Down
43 changes: 21 additions & 22 deletions arrow/benches/interleave_kernels.rs
Expand Up @@ -60,31 +60,30 @@ fn do_bench(
}

fn add_benchmark(c: &mut Criterion) {
let a = create_primitive_array::<Int32Type>(1024, 0.);
let i32 = create_primitive_array::<Int32Type>(1024, 0.);
let i32_opt = create_primitive_array::<Int32Type>(1024, 0.5);
let string = create_string_array_with_len::<i32>(1024, 0., 20);
let string_opt = create_string_array_with_len::<i32>(1024, 0.5, 20);

do_bench(c, "i32(0.0)", 100, &a, &[0..100, 100..230, 450..1000]);
do_bench(c, "i32(0.0)", 400, &a, &[0..100, 100..230, 450..1000]);
do_bench(c, "i32(0.0)", 1024, &a, &[0..100, 100..230, 450..1000]);
do_bench(
c,
"i32(0.0)",
1024,
&a,
&[0..100, 100..230, 450..1000, 0..1000],
);
let cases: &[(&str, &dyn Array)] = &[
("i32(0.0)", &i32),
("i32(0.5)", &i32_opt),
("str(20, 0.0)", &string),
("str(20, 0.5)", &string_opt),
];

let a = create_primitive_array::<Int32Type>(1024, 0.5);
for (prefix, base) in cases {
let slices: &[(usize, &[_])] = &[
(100, &[0..100, 100..230, 450..1000]),
(400, &[0..100, 100..230, 450..1000]),
(1024, &[0..100, 100..230, 450..1000]),
(1024, &[0..100, 100..230, 450..1000, 0..1000]),
];

do_bench(c, "i32(0.5)", 100, &a, &[0..100, 100..230, 450..1000]);
do_bench(c, "i32(0.5)", 400, &a, &[0..100, 100..230, 450..1000]);
do_bench(c, "i32(0.5)", 1024, &a, &[0..100, 100..230, 450..1000]);
do_bench(
c,
"i32(0.5)",
1024,
&a,
&[0..100, 100..230, 450..1000, 0..1000],
);
for (len, slice) in slices {
do_bench(c, prefix, *len, *base, slice);
}
}
}

criterion_group!(benches, add_benchmark);
Expand Down

0 comments on commit 66ea66b

Please sign in to comment.