Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter for run end array #5573

Merged
merged 5 commits into from
Apr 4, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
114 changes: 112 additions & 2 deletions arrow-select/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use std::sync::Arc;

use arrow_array::builder::BooleanBufferBuilder;
use arrow_array::cast::AsArray;
use arrow_array::types::{ArrowDictionaryKeyType, ByteArrayType};
use arrow_array::types::{ArrowDictionaryKeyType, ByteArrayType, Int64Type};
use arrow_array::*;
use arrow_buffer::{bit_util, BooleanBuffer, NullBuffer};
use arrow_buffer::{bit_util, BooleanBuffer, NullBuffer, RunEndBuffer};
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_data::bit_iterator::{BitIndexIterator, BitSliceIterator};
use arrow_data::transform::MutableArrayData;
Expand Down Expand Up @@ -336,6 +336,13 @@ fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> Result<Array
DataType::LargeBinary => {
Ok(Arc::new(filter_bytes(values.as_binary::<i64>(), predicate)))
}
DataType::RunEndEncoded(_, _) => {
if let Some(ree_array) = values.as_any().downcast_ref::<RunArray<Int64Type>>() {
Ok(Arc::new(filter_run_end_array(ree_array, predicate)?))
} else {
unimplemented!("Filter not supported for RunEndEncoded type {:?}", values.data_type())
}
}
Jefffrey marked this conversation as resolved.
Show resolved Hide resolved
DataType::Dictionary(_, _) => downcast_dictionary_array! {
values => Ok(Arc::new(filter_dict(values, predicate))),
t => unimplemented!("Filter not supported for dictionary type {:?}", t)
Expand Down Expand Up @@ -368,6 +375,43 @@ fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> Result<Array
}
}

/// Filter a [`RunArray`] based on a [`FilterPredicate`]
fn filter_run_end_array(
re_arr: &RunArray<Int64Type>,
pred: &FilterPredicate,
) -> Result<RunArray<Int64Type>, ArrowError> {
let run_ends: &RunEndBuffer<i64> = re_arr.run_ends();
let mut values_filter = BooleanBufferBuilder::new(run_ends.len());
let mut new_run_ends = vec![0i64; run_ends.len()];

let mut start = 0i64;
let mut i = 0;
let filter_values = pred.filter.values();
let mut count = 0;
for end in run_ends.inner() {
let mut keep = false;
for pred in (start..*end).map(|i| unsafe { filter_values.value_unchecked(i as usize) }) {
Jefffrey marked this conversation as resolved.
Show resolved Hide resolved
count += pred as i64;
keep |= pred
}
new_run_ends[i] = count;
i += keep as usize;
values_filter.append(keep);
start = *end;
}

new_run_ends.truncate(i);

if values_filter.is_empty() {
new_run_ends.clear();
}
Comment on lines +415 to +419
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you allocate the new_run_ends as just an empty Vec but with run_ends.len() capacity and push to it, you probably won't need this part

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm using this trick

        new_run_ends[i] = count;
        i += keep as usize;

to make it branchless, I can't do the same thing with push


let values = re_arr.values();
let pred = BooleanArray::new(values_filter.finish(), None);
let new_values = filter(&values, &pred)?;
RunArray::try_new(&PrimitiveArray::from(new_run_ends), &new_values)
}

/// Computes a new null mask for `data` based on `predicate`
///
/// If the predicate selected no null-rows, returns `None`, otherwise returns
Expand Down Expand Up @@ -844,6 +888,72 @@ mod tests {
assert_eq!(9, d.value(1));
}

#[test]
fn test_filter_run_end_encoding_array() {
let run_ends = Int64Array::from(vec![2, 3, 8]);
let values = Int64Array::from(vec![7, -2, 9]);
let a = RunArray::try_new(&run_ends, &values).expect("Failed to create RunArray");
let b = BooleanArray::from(vec![true, false, true, false, true, false, true, false]);
let c = filter(&a, &b).unwrap();
let actual = c
.as_ref()
.as_any()
.downcast_ref::<RunArray<Int64Type>>()
.unwrap();
Jefffrey marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(4, actual.len());

let expected = RunArray::try_new(
&Int64Array::from(vec![1, 2, 4]),
&Int64Array::from(vec![7, -2, 9]),
)
.expect("Failed to make expected RunArray test is broken");

assert_eq!(&actual.run_ends().values(), &expected.run_ends().values());
assert_eq!(actual.values(), expected.values())
}

#[test]
fn test_filter_run_end_encoding_array_remove_value() {
let run_ends = Int64Array::from(vec![2, 3, 8, 10]);
let values = Int64Array::from(vec![7, -2, 9, -8]);
let a = RunArray::try_new(&run_ends, &values).expect("Failed to create RunArray");
let b = BooleanArray::from(vec![
false, true, false, false, true, false, true, false, false, false,
]);
let c = filter(&a, &b).unwrap();
let actual = c
.as_ref()
.as_any()
.downcast_ref::<RunArray<Int64Type>>()
.unwrap();
assert_eq!(3, actual.len());

let expected =
RunArray::try_new(&Int64Array::from(vec![1, 3]), &Int64Array::from(vec![7, 9]))
.expect("Failed to make expected RunArray test is broken");

assert_eq!(&actual.run_ends().values(), &expected.run_ends().values());
assert_eq!(actual.values(), expected.values())
}

#[test]
fn test_filter_run_end_encoding_array_empty() {
let run_ends = Int64Array::from(vec![2, 3, 8, 10]);
let values = Int64Array::from(vec![7, -2, 9, -8]);
let a = RunArray::try_new(&run_ends, &values).expect("Failed to create RunArray");
let b = BooleanArray::from(vec![
false, false, false, false, false, false, false, false, false, false,
]);
let c = filter(&a, &b).unwrap();
let actual = c
.as_ref()
.as_any()
.downcast_ref::<RunArray<Int64Type>>()
.unwrap();

assert_eq!(0, actual.len());
}

#[test]
fn test_filter_dictionary_array() {
let values = [Some("hello"), None, Some("world"), Some("!")];
Expand Down