diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 35b70a0485c..1f841a0ee17 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -41,7 +41,7 @@ mod filter; mod selection; pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; -pub use selection::{RowSelection, RowSelector}; +pub use selection::{intersect_row_selections, RowSelection, RowSelector}; /// A generic builder for constructing sync or async arrow parquet readers. This is not intended /// to be used directly, instead you should use the specialization for the type of reader diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 357960906c8..e01c584b6e6 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -349,6 +349,66 @@ impl From for VecDeque { } } +// Combine two lists of `RowSelection` return the intersection of them +// For example: +// self: NNYYYYNNYYNYN +// other: NYNNNNNNY +// +// returned: NNNNNNNNYYNYN +pub fn intersect_row_selections( + left: Vec, + right: Vec, +) -> Vec { + let mut res = Vec::with_capacity(left.len()); + let mut l_iter = left.into_iter().peekable(); + let mut r_iter = right.into_iter().peekable(); + + while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) { + if a.row_count == 0 { + l_iter.next().unwrap(); + continue; + } + if b.row_count == 0 { + r_iter.next().unwrap(); + continue; + } + match (a.skip, b.skip) { + // Keep both ranges + (false, false) => { + if a.row_count < b.row_count { + res.push(RowSelector::select(a.row_count)); + b.row_count -= a.row_count; + l_iter.next().unwrap(); + } else { + res.push(RowSelector::select(b.row_count)); + a.row_count -= b.row_count; + r_iter.next().unwrap(); + } + } + // skip at least one + _ => { + if a.row_count < b.row_count { + res.push(RowSelector::skip(a.row_count)); + b.row_count -= a.row_count; + l_iter.next().unwrap(); + } else { + res.push(RowSelector::skip(b.row_count)); + a.row_count -= b.row_count; + r_iter.next().unwrap(); + } + } + } + } + + if l_iter.peek().is_some() { + res.extend(l_iter); + } + if r_iter.peek().is_some() { + res.extend(r_iter); + } + res +} + fn add_selector(skip: bool, sum_row: usize, combined_result: &mut Vec) { let selector = if skip { RowSelector::skip(sum_row) @@ -618,6 +678,74 @@ mod tests { a.and_then(&b); } + #[test] + fn test_intersect_row_selection_and_combine() { + // a size equal b size + let a = vec![ + RowSelector::select(5), + RowSelector::skip(4), + RowSelector::select(1), + ]; + let b = vec![ + RowSelector::select(8), + RowSelector::skip(1), + RowSelector::select(1), + ]; + + let res = intersect_row_selections(a, b); + assert_eq!( + RowSelection::from_selectors_and_combine(&res).selectors, + vec![ + RowSelector::select(5), + RowSelector::skip(4), + RowSelector::select(1), + ], + ); + + // a size larger than b size + let a = vec![ + RowSelector::select(3), + RowSelector::skip(33), + RowSelector::select(3), + RowSelector::skip(33), + ]; + let b = vec![RowSelector::select(36), RowSelector::skip(36)]; + let res = intersect_row_selections(a, b); + assert_eq!( + RowSelection::from_selectors_and_combine(&res).selectors, + vec![RowSelector::select(3), RowSelector::skip(69)] + ); + + // a size less than b size + let a = vec![RowSelector::select(3), RowSelector::skip(7)]; + let b = vec![ + RowSelector::select(2), + RowSelector::skip(2), + RowSelector::select(2), + RowSelector::skip(2), + RowSelector::select(2), + ]; + let res = intersect_row_selections(a, b); + assert_eq!( + RowSelection::from_selectors_and_combine(&res).selectors, + vec![RowSelector::select(2), RowSelector::skip(8)] + ); + + let a = vec![RowSelector::select(3), RowSelector::skip(7)]; + let b = vec![ + RowSelector::select(2), + RowSelector::skip(2), + RowSelector::select(2), + RowSelector::skip(2), + RowSelector::select(2), + ]; + let res = intersect_row_selections(a, b); + assert_eq!( + RowSelection::from_selectors_and_combine(&res).selectors, + vec![RowSelector::select(2), RowSelector::skip(8)] + ); + } + #[test] fn test_and_fuzz() { let mut rand = thread_rng();