Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter for run end array #5573

Merged
merged 5 commits into from
Apr 4, 2024
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
167 changes: 165 additions & 2 deletions arrow-select/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

//! Defines filter kernels

use std::ops::AddAssign;
use std::sync::Arc;

use arrow_array::builder::BooleanBufferBuilder;
use arrow_array::cast::AsArray;
use arrow_array::types::{ArrowDictionaryKeyType, ByteArrayType};
use arrow_array::types::{
ArrowDictionaryKeyType, ArrowPrimitiveType, ByteArrayType, Int16Type, Int32Type, Int64Type,
RunEndIndexType,
};
use arrow_array::*;
use arrow_buffer::{bit_util, BooleanBuffer, NullBuffer};
use arrow_buffer::{bit_util, BooleanBuffer, NullBuffer, RunEndBuffer};
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_data::bit_iterator::{BitIndexIterator, BitSliceIterator};
use arrow_data::transform::MutableArrayData;
Expand Down Expand Up @@ -336,6 +340,12 @@ fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> Result<Array
DataType::LargeBinary => {
Ok(Arc::new(filter_bytes(values.as_binary::<i64>(), predicate)))
}
DataType::RunEndEncoded(_, _) => {
downcast_run_array!{
values => filter_run_end_array(values, predicate),
t => unimplemented!("Filter not supported for RunEndEncoded type {:?}", t)
}
}
DataType::Dictionary(_, _) => downcast_dictionary_array! {
values => Ok(Arc::new(filter_dict(values, predicate))),
t => unimplemented!("Filter not supported for dictionary type {:?}", t)
Expand Down Expand Up @@ -368,6 +378,86 @@ fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> Result<Array
}
}

fn filter_run_end_array<R: RunEndIndexType>(
ree_arr: &RunArray<R>,
pred: &FilterPredicate,
) -> Result<ArrayRef, ArrowError> {
fn downcast_safe<A: RunEndIndexType, T: RunEndIndexType>(
re_arr: &RunArray<A>,
) -> Option<&RunArray<T>> {
re_arr.as_any().downcast_ref::<RunArray<T>>()
}

if let Some(ree_array) = downcast_safe::<R, Int64Type>(ree_arr) {
let (run_ends, values) = filter_run_end_array_generic(ree_array, pred)?;
let ree_arr: RunArray<Int64Type> =
RunArray::try_new(&PrimitiveArray::from(run_ends), &values)?;
Ok(Arc::new(ree_arr))
} else if let Some(ree_array) = downcast_safe::<R, Int32Type>(ree_arr) {
let (run_ends, values) = filter_run_end_array_generic(ree_array, pred)?;
let ree_arr: RunArray<Int32Type> =
RunArray::try_new(&PrimitiveArray::from(run_ends), &values)?;
Ok(Arc::new(ree_arr))
} else if let Some(ree_array) = downcast_safe::<R, Int16Type>(ree_arr) {
let (run_ends, values) = filter_run_end_array_generic(ree_array, pred)?;
let ree_arr: RunArray<Int16Type> =
RunArray::try_new(&PrimitiveArray::from(run_ends), &values)?;
Ok(Arc::new(ree_arr))
} else {
Err(ArrowError::CastError(
"Run ends for RunArray must be i16 or i32 or i64".to_string(),
))
}
Jefffrey marked this conversation as resolved.
Show resolved Hide resolved
}

/// Filter any supported [`RunArray`] based on a [`FilterPredicate`]
#[allow(clippy::type_complexity)]
fn filter_run_end_array_generic<R: RunEndIndexType>(
re_arr: &RunArray<R>,
pred: &FilterPredicate,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can leave a TODO item to utilize the IterationStrategy within FilterPredicate for potential performance benefit to keep this PR as more an initial version of filter for run end arrays?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could handle the None case in this PR, the index based ones are a poor fit for REE I suspect unless the selectivity of the filter is high. I'd need a benchmark but in short I would prefer to leave it as TODO

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think None case is already handled by the parent

IterationStrategy::None => Ok(new_empty_array(values.data_type())),

) -> Result<(Vec<R::Native>, Arc<dyn Array>), ArrowError>
where
R::Native: Into<i64> + From<bool>,
R::Native: AddAssign,
{
let run_ends: &RunEndBuffer<R::Native> = re_arr.run_ends();
let mut values_filter = BooleanBufferBuilder::new(run_ends.len());
let mut new_run_ends = vec![R::default_value(); run_ends.len()];

let mut start = 0i64;
let mut i = 0;
let filter_values = pred.filter.values();
let mut count = R::default_value();

for end in run_ends.inner().into_iter().map(|i| (*i).into()) {
let mut keep = false;
// in filter_array the predicate array is checked to have the same len as the run end array
// this means the largest value in the run_ends is == to pred.len()
// so we're always within bounds when calling value_unchecked
for pred in (start..end).map(|i| unsafe { filter_values.value_unchecked(i as usize) }) {
count += R::Native::from(pred);
keep |= pred
}
// this is to avoid branching
new_run_ends[i] = count;
i += keep as usize;

values_filter.append(keep);
start = end;
}

new_run_ends.truncate(i);

if values_filter.is_empty() {
new_run_ends.clear();
}
Comment on lines +415 to +419
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you allocate the new_run_ends as just an empty Vec but with run_ends.len() capacity and push to it, you probably won't need this part

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm using this trick

        new_run_ends[i] = count;
        i += keep as usize;

to make it branchless, I can't do the same thing with push


let values = re_arr.values();
let pred = BooleanArray::new(values_filter.finish(), None);
let new_values = filter(&values, &pred)?;
Ok((new_run_ends, new_values))
}

/// Computes a new null mask for `data` based on `predicate`
///
/// If the predicate selected no null-rows, returns `None`, otherwise returns
Expand Down Expand Up @@ -635,6 +725,7 @@ where
#[cfg(test)]
mod tests {
use arrow_array::builder::*;
use arrow_array::cast::as_run_array;
use arrow_array::types::*;
use rand::distributions::{Alphanumeric, Standard};
use rand::prelude::*;
Expand Down Expand Up @@ -844,6 +935,78 @@ mod tests {
assert_eq!(9, d.value(1));
}

#[test]
fn test_filter_run_end_encoding_array() {
let run_ends = Int64Array::from(vec![2, 3, 8]);
let values = Int64Array::from(vec![7, -2, 9]);
let a = RunArray::try_new(&run_ends, &values).expect("Failed to create RunArray");
let b = BooleanArray::from(vec![true, false, true, false, true, false, true, false]);
let c = filter(&a, &b).unwrap();
let actual: &RunArray<Int64Type> = as_run_array(&c);
assert_eq!(4, actual.len());

let expected = RunArray::try_new(
&Int64Array::from(vec![1, 2, 4]),
&Int64Array::from(vec![7, -2, 9]),
)
.expect("Failed to make expected RunArray test is broken");

assert_eq!(&actual.run_ends().values(), &expected.run_ends().values());
assert_eq!(actual.values(), expected.values())
}

#[test]
fn test_filter_run_end_encoding_array_remove_value() {
let run_ends = Int32Array::from(vec![2, 3, 8, 10]);
let values = Int32Array::from(vec![7, -2, 9, -8]);
let a = RunArray::try_new(&run_ends, &values).expect("Failed to create RunArray");
let b = BooleanArray::from(vec![
false, true, false, false, true, false, true, false, false, false,
]);
let c = filter(&a, &b).unwrap();
let actual: &RunArray<Int32Type> = as_run_array(&c);
assert_eq!(3, actual.len());

let expected =
RunArray::try_new(&Int32Array::from(vec![1, 3]), &Int32Array::from(vec![7, 9]))
.expect("Failed to make expected RunArray test is broken");

assert_eq!(&actual.run_ends().values(), &expected.run_ends().values());
assert_eq!(actual.values(), expected.values())
}

#[test]
fn test_filter_run_end_encoding_array_remove_all_but_one() {
let run_ends = Int16Array::from(vec![2, 3, 8, 10]);
let values = Int16Array::from(vec![7, -2, 9, -8]);
let a = RunArray::try_new(&run_ends, &values).expect("Failed to create RunArray");
let b = BooleanArray::from(vec![
false, false, false, false, false, false, true, false, false, false,
]);
let c = filter(&a, &b).unwrap();
let actual: &RunArray<Int16Type> = as_run_array(&c);
assert_eq!(1, actual.len());

let expected = RunArray::try_new(&Int16Array::from(vec![1]), &Int16Array::from(vec![9]))
.expect("Failed to make expected RunArray test is broken");

assert_eq!(&actual.run_ends().values(), &expected.run_ends().values());
assert_eq!(actual.values(), expected.values())
}

#[test]
fn test_filter_run_end_encoding_array_empty() {
let run_ends = Int64Array::from(vec![2, 3, 8, 10]);
let values = Int64Array::from(vec![7, -2, 9, -8]);
let a = RunArray::try_new(&run_ends, &values).expect("Failed to create RunArray");
let b = BooleanArray::from(vec![
false, false, false, false, false, false, false, false, false, false,
]);
let c = filter(&a, &b).unwrap();
let actual: &RunArray<Int64Type> = as_run_array(&c);
assert_eq!(0, actual.len());
}

#[test]
fn test_filter_dictionary_array() {
let values = [Some("hello"), None, Some("world"), Some("!")];
Expand Down