Skip to content

Commit

Permalink
Add ValuesIter to iterate defined values in ArrayAccessor (apache#2578)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Aug 24, 2022
1 parent f11fc1f commit 4fbd87a
Show file tree
Hide file tree
Showing 12 changed files with 109 additions and 62 deletions.
36 changes: 33 additions & 3 deletions arrow/src/array/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,14 +339,44 @@ impl<'a, T: Array> Array for &'a T {
pub trait ArrayAccessor: Array {
type Item: Send + Sync;

/// `true` if null indices, i.e. where [`Array::is_null`], have a defined value. The exact
/// value may be unspecified, but it must not be undefined.
///
/// This allows for optimised kernels that can process the null mask separately
/// from processing the value data.
///
/// For example, the value of a [`PrimitiveArray`] is always well defined as it always
/// corresponds to an initialized slot in the underlying buffer, and all bit patterns
/// correspond to a valid primitive value.
///
/// An example where nulls are not well defined is a [`DictionaryArray`]. This is because
/// the key at a null index is unspecified, and consequently may exceed the bounds of the
/// values array.
///
const NULLS_DEFINED: bool;

/// Returns the element at index `i`
///
/// # Panics
/// Panics if the value is outside the bounds of the array
fn value(&self, index: usize) -> Self::Item;
///
/// Panics if the value is outside the bounds of the array or is not well
/// defined, see [`Self::NULLS_DEFINED`]
fn value(&self, index: usize) -> Self::Item {
assert!(index < self.len(), "Out of bounds access");
assert!(
Self::NULLS_DEFINED || self.is_valid(index),
"Value at index {} is not defined",
index
);
unsafe { self.value_unchecked(index) }
}

/// Returns the element at index `i`
///
/// # Safety
/// Caller is responsible for ensuring that the index is within the bounds of the array
///
/// Caller is responsible for ensuring that the index is within the bounds of the array,
/// and that the value at the index is well defined, see [`Self::NULLS_DEFINED`]
unsafe fn value_unchecked(&self, index: usize) -> Self::Item;
}

Expand Down
5 changes: 1 addition & 4 deletions arrow/src/array/array_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,7 @@ impl<'a, OffsetSize: OffsetSizeTrait> ArrayAccessor
for &'a GenericBinaryArray<OffsetSize>
{
type Item = &'a [u8];

fn value(&self, index: usize) -> Self::Item {
GenericBinaryArray::value(self, index)
}
const NULLS_DEFINED: bool = true;

unsafe fn value_unchecked(&self, index: usize) -> Self::Item {
GenericBinaryArray::value_unchecked(self, index)
Expand Down
5 changes: 1 addition & 4 deletions arrow/src/array/array_boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,7 @@ impl Array for BooleanArray {

impl<'a> ArrayAccessor for &'a BooleanArray {
type Item = bool;

fn value(&self, index: usize) -> Self::Item {
BooleanArray::value(self, index)
}
const NULLS_DEFINED: bool = true;

unsafe fn value_unchecked(&self, index: usize) -> Self::Item {
BooleanArray::value_unchecked(self, index)
Expand Down
4 changes: 1 addition & 3 deletions arrow/src/array/array_decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,9 +514,7 @@ impl<T: DecimalType> fmt::Debug for DecimalArray<T> {
impl<'a, T: DecimalType> ArrayAccessor for &'a DecimalArray<T> {
type Item = Decimal<T>;

fn value(&self, index: usize) -> Self::Item {
DecimalArray::<T>::value(self, index)
}
const NULLS_DEFINED: bool = true;

unsafe fn value_unchecked(&self, index: usize) -> Self::Item {
DecimalArray::<T>::value_unchecked(self, index)
Expand Down
8 changes: 1 addition & 7 deletions arrow/src/array/array_dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,13 +493,7 @@ where
&'a V: ArrayAccessor,
{
type Item = <&'a V as ArrayAccessor>::Item;

fn value(&self, index: usize) -> Self::Item {
assert!(self.dictionary.is_valid(index), "{}", index);
let value_idx = self.dictionary.keys.value(index).to_usize().unwrap();
// Dictionary indexes should be valid
unsafe { self.values.value_unchecked(value_idx) }
}
const NULLS_DEFINED: bool = false;

unsafe fn value_unchecked(&self, index: usize) -> Self::Item {
let val = self.dictionary.keys.value_unchecked(index);
Expand Down
5 changes: 1 addition & 4 deletions arrow/src/array/array_fixed_size_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,7 @@ impl Array for FixedSizeBinaryArray {

impl<'a> ArrayAccessor for &'a FixedSizeBinaryArray {
type Item = &'a [u8];

fn value(&self, index: usize) -> Self::Item {
FixedSizeBinaryArray::value(self, index)
}
const NULLS_DEFINED: bool = true;

unsafe fn value_unchecked(&self, index: usize) -> Self::Item {
FixedSizeBinaryArray::value_unchecked(self, index)
Expand Down
5 changes: 1 addition & 4 deletions arrow/src/array/array_fixed_size_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,7 @@ impl Array for FixedSizeListArray {

impl ArrayAccessor for FixedSizeListArray {
type Item = ArrayRef;

fn value(&self, index: usize) -> Self::Item {
FixedSizeListArray::value(self, index)
}
const NULLS_DEFINED: bool = true;

unsafe fn value_unchecked(&self, index: usize) -> Self::Item {
FixedSizeListArray::value(self, index)
Expand Down
5 changes: 1 addition & 4 deletions arrow/src/array/array_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,7 @@ impl<OffsetSize: OffsetSizeTrait> Array for GenericListArray<OffsetSize> {

impl<'a, OffsetSize: OffsetSizeTrait> ArrayAccessor for &'a GenericListArray<OffsetSize> {
type Item = ArrayRef;

fn value(&self, index: usize) -> Self::Item {
GenericListArray::value(self, index)
}
const NULLS_DEFINED: bool = true;

unsafe fn value_unchecked(&self, index: usize) -> Self::Item {
GenericListArray::value(self, index)
Expand Down
5 changes: 1 addition & 4 deletions arrow/src/array/array_primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,7 @@ impl<T: ArrowPrimitiveType> Array for PrimitiveArray<T> {

impl<'a, T: ArrowPrimitiveType> ArrayAccessor for &'a PrimitiveArray<T> {
type Item = T::Native;

fn value(&self, index: usize) -> Self::Item {
PrimitiveArray::value(self, index)
}
const NULLS_DEFINED: bool = true;

unsafe fn value_unchecked(&self, index: usize) -> Self::Item {
PrimitiveArray::value_unchecked(self, index)
Expand Down
5 changes: 1 addition & 4 deletions arrow/src/array/array_string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,7 @@ impl<'a, OffsetSize: OffsetSizeTrait> ArrayAccessor
for &'a GenericStringArray<OffsetSize>
{
type Item = &'a str;

fn value(&self, index: usize) -> Self::Item {
GenericStringArray::value(self, index)
}
const NULLS_DEFINED: bool = true;

unsafe fn value_unchecked(&self, index: usize) -> Self::Item {
GenericStringArray::value_unchecked(self, index)
Expand Down
61 changes: 51 additions & 10 deletions arrow/src/array/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use crate::array::array::ArrayAccessor;
use crate::array::{DecimalArray, FixedSizeBinaryArray};
use crate::datatypes::{Decimal128Type, Decimal256Type};
use std::ops::Range;

use super::{
BooleanArray, GenericBinaryArray, GenericListArray, GenericStringArray,
Expand Down Expand Up @@ -59,18 +60,16 @@ impl<T: ArrayAccessor> Iterator for ArrayIter<T> {
let old = self.current;
self.current += 1;
// Safety:
// we just checked bounds in `self.current_end == self.current`
// this is safe on the premise that this struct is initialized with
// current = array.len()
// and that current_end is ever only decremented
// This is safe as `current` cannot exceed the bounds of the array
// and we have verified that the value is not null
unsafe { Some(Some(self.array.value_unchecked(old))) }
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(
self.array.len() - self.current,
Some(self.array.len() - self.current),
self.current_end - self.current,
Some(self.current_end - self.current),
)
}
}
Expand All @@ -85,10 +84,8 @@ impl<T: ArrayAccessor> DoubleEndedIterator for ArrayIter<T> {
None
} else {
// Safety:
// we just checked bounds in `self.current_end == self.current`
// this is safe on the premise that this struct is initialized with
// current = array.len()
// and that current_end is ever only decremented
// This is safe as `current` cannot exceed the bounds of the array
// and we have verified that the value is not null
unsafe { Some(self.array.value_unchecked(self.current_end)) }
})
}
Expand All @@ -115,6 +112,50 @@ pub type Decimal128Iter<'a> = DecimalIter<'a, Decimal128Type>;
/// [`super::Decimal256Array`]
pub type Decimal256Iter<'a> = DecimalIter<'a, Decimal256Type>;

/// An iterator over the defined values within an array
///
/// Note: Unlike [`ArrayIter`] this may return values for null indexes, provided
/// doing so is well defined
pub(crate) struct ValuesIter<T: ArrayAccessor> {
inner: ValuesIterInner<T>,
}

impl<T: ArrayAccessor> ValuesIter<T> {
pub(crate) fn new(array: T) -> Self {
let inner = match T::NULLS_DEFINED || array.null_count() == 0 {
true => ValuesIterInner::Indices(0..array.len(), array),
false => ValuesIterInner::Array(ArrayIter::new(array)),
};

Self { inner }
}
}

impl<T: ArrayAccessor> Iterator for ValuesIter<T> {
type Item = Option<T::Item>;

fn next(&mut self) -> Option<Self::Item> {
match &mut self.inner {
ValuesIterInner::Indices(idx, array) => {
Some(Some(unsafe { array.value_unchecked(idx.next()?) }))
}
ValuesIterInner::Array(a) => a.next(),
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
match &self.inner {
ValuesIterInner::Indices(idx, _) => idx.size_hint(),
ValuesIterInner::Array(a) => a.size_hint(),
}
}
}

enum ValuesIterInner<T: ArrayAccessor> {
Indices(Range<usize>, T),
Array(ArrayIter<T>),
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
27 changes: 16 additions & 11 deletions arrow/src/compute/kernels/comparison.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,25 @@ where
));
}

let len = left.len();

let null_bit_buffer =
combine_option_bitmap(&[left.data_ref(), right.data_ref()], left.len())?;

// Safety:
// `i < $left.len()` and $left.len() == $right.len()
let comparison = (0..left.len())
.map(|i| unsafe { op(left.value_unchecked(i), right.value_unchecked(i)) });
// same size as $left.len() and $right.len()
let comparison =
ValuesIter::new(left)
.zip(ValuesIter::new(right))
.map(|(a, b)| match (a, b) {
(Some(a), Some(b)) => op(a, b),
_ => false,
});
// SAFETY: ValuesIter is a trusted length iterator
let buffer = unsafe { MutableBuffer::from_trusted_len_iter_bool(comparison) };

let data = unsafe {
ArrayData::new_unchecked(
DataType::Boolean,
left.len(),
len,
None,
null_bit_buffer,
0,
Expand All @@ -91,16 +96,16 @@ where
.null_buffer()
.map(|b| b.bit_slice(left.offset(), left.len()));

// Safety:
// `i < $left.len()`
let comparison = (0..left.len()).map(|i| unsafe { op(left.value_unchecked(i)) });
// same as $left.len()
let len = left.len();

let comparison = ValuesIter::new(left).map(|x| x.map(|v| op(v)).unwrap_or(false));
// ValuesIter is a trusted length iterator
let buffer = unsafe { MutableBuffer::from_trusted_len_iter_bool(comparison) };

let data = unsafe {
ArrayData::new_unchecked(
DataType::Boolean,
left.len(),
len,
None,
null_bit_buffer,
0,
Expand Down

0 comments on commit 4fbd87a

Please sign in to comment.