Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move intersect_row_selections from datafusion to arrow-rs. #3047

Merged
merged 2 commits into from Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_reader/mod.rs
Expand Up @@ -43,7 +43,7 @@ mod filter;
mod selection;

pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelector};
pub use selection::{intersect_row_selections, RowSelection, RowSelector};

/// A generic builder for constructing sync or async arrow parquet readers. This is not intended
/// to be used directly, instead you should use the specialization for the type of reader
Expand Down
128 changes: 128 additions & 0 deletions parquet/src/arrow/arrow_reader/selection.rs
Expand Up @@ -349,6 +349,66 @@ impl From<RowSelection> for VecDeque<RowSelector> {
}
}

// Combine two lists of `RowSelection` return the intersection of them
// For example:
// self: NNYYYYNNYYNYN
// other: NYNNNNNNY
//
// returned: NNNNNNNNYYNYN
pub fn intersect_row_selections(
left: Vec<RowSelector>,
right: Vec<RowSelector>,
) -> Vec<RowSelector> {
let mut res = Vec::with_capacity(left.len());
let mut l_iter = left.into_iter().peekable();
let mut r_iter = right.into_iter().peekable();

while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) {
if a.row_count == 0 {
l_iter.next().unwrap();
continue;
}
if b.row_count == 0 {
r_iter.next().unwrap();
continue;
}
match (a.skip, b.skip) {
// Keep both ranges
(false, false) => {
if a.row_count < b.row_count {
res.push(RowSelector::select(a.row_count));
b.row_count -= a.row_count;
l_iter.next().unwrap();
} else {
res.push(RowSelector::select(b.row_count));
a.row_count -= b.row_count;
r_iter.next().unwrap();
}
}
// skip at least one
_ => {
if a.row_count < b.row_count {
res.push(RowSelector::skip(a.row_count));
b.row_count -= a.row_count;
l_iter.next().unwrap();
} else {
res.push(RowSelector::skip(b.row_count));
a.row_count -= b.row_count;
r_iter.next().unwrap();
}
}
}
}

if l_iter.peek().is_some() {
res.extend(l_iter);
}
if r_iter.peek().is_some() {
res.extend(r_iter);
}
res
}

fn add_selector(skip: bool, sum_row: usize, combined_result: &mut Vec<RowSelector>) {
let selector = if skip {
RowSelector::skip(sum_row)
Expand Down Expand Up @@ -618,6 +678,74 @@ mod tests {
a.and_then(&b);
}

#[test]
fn test_intersect_row_selection_and_combine() {
// a size equal b size
let a = vec![
RowSelector::select(5),
RowSelector::skip(4),
RowSelector::select(1),
];
let b = vec![
RowSelector::select(8),
RowSelector::skip(1),
RowSelector::select(1),
];

let res = intersect_row_selections(a, b);
assert_eq!(
RowSelection::from_selectors_and_combine(&res).selectors,
vec![
RowSelector::select(5),
RowSelector::skip(4),
RowSelector::select(1),
],
);

// a size larger than b size
let a = vec![
RowSelector::select(3),
RowSelector::skip(33),
RowSelector::select(3),
RowSelector::skip(33),
];
let b = vec![RowSelector::select(36), RowSelector::skip(36)];
let res = intersect_row_selections(a, b);
assert_eq!(
RowSelection::from_selectors_and_combine(&res).selectors,
vec![RowSelector::select(3), RowSelector::skip(69)]
);

// a size less than b size
let a = vec![RowSelector::select(3), RowSelector::skip(7)];
let b = vec![
RowSelector::select(2),
RowSelector::skip(2),
RowSelector::select(2),
RowSelector::skip(2),
RowSelector::select(2),
];
let res = intersect_row_selections(a, b);
assert_eq!(
RowSelection::from_selectors_and_combine(&res).selectors,
vec![RowSelector::select(2), RowSelector::skip(8)]
);

let a = vec![RowSelector::select(3), RowSelector::skip(7)];
let b = vec![
RowSelector::select(2),
RowSelector::skip(2),
RowSelector::select(2),
RowSelector::skip(2),
RowSelector::select(2),
];
let res = intersect_row_selections(a, b);
assert_eq!(
RowSelection::from_selectors_and_combine(&res).selectors,
vec![RowSelector::select(2), RowSelector::skip(8)]
);
}

#[test]
fn test_and_fuzz() {
let mut rand = thread_rng();
Expand Down