Skip to content

Commit

Permalink
refactor avoid pub mod
Browse files Browse the repository at this point in the history
  • Loading branch information
Ted-Jiang committed Nov 8, 2022
1 parent 2e3c478 commit 7627627
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 61 deletions.
62 changes: 61 additions & 1 deletion parquet/src/arrow/arrow_reader/mod.rs
Expand Up @@ -40,7 +40,7 @@ use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::schema::types::SchemaDescriptor;

mod filter;
pub mod selection;
mod selection;

pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelector};
Expand Down Expand Up @@ -590,6 +590,66 @@ impl ParquetRecordBatchReader {
}
}

// Combine two lists of `RowSelection` return the intersection of them
// For example:
// self: NNYYYYNNYYNYN
// other: NYNNNNNNY
//
// returned: NNNNNNNNYYNYN
pub fn intersect_row_selections(
left: Vec<RowSelector>,
right: Vec<RowSelector>,
) -> Vec<RowSelector> {
let mut res = Vec::with_capacity(left.len());
let mut l_iter = left.into_iter().peekable();
let mut r_iter = right.into_iter().peekable();

while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) {
if a.row_count == 0 {
l_iter.next().unwrap();
continue;
}
if b.row_count == 0 {
r_iter.next().unwrap();
continue;
}
match (a.skip, b.skip) {
// Keep both ranges
(false, false) => {
if a.row_count < b.row_count {
res.push(RowSelector::select(a.row_count));
b.row_count -= a.row_count;
l_iter.next().unwrap();
} else {
res.push(RowSelector::select(b.row_count));
a.row_count -= b.row_count;
r_iter.next().unwrap();
}
}
// skip at least one
_ => {
if a.row_count < b.row_count {
res.push(RowSelector::skip(a.row_count));
b.row_count -= a.row_count;
l_iter.next().unwrap();
} else {
res.push(RowSelector::skip(b.row_count));
a.row_count -= b.row_count;
r_iter.next().unwrap();
}
}
}
}

if l_iter.peek().is_some() {
res.extend(l_iter);
}
if r_iter.peek().is_some() {
res.extend(r_iter);
}
res
}

/// Returns `true` if `selection` is `None` or selects some rows
pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
selection.map(|x| x.selects_any()).unwrap_or(true)
Expand Down
61 changes: 1 addition & 60 deletions parquet/src/arrow/arrow_reader/selection.rs
Expand Up @@ -349,66 +349,6 @@ impl From<RowSelection> for VecDeque<RowSelector> {
}
}

// Combine two lists of `RowSelection` return the intersection of them
// For example:
// self: NNYYYYNNYYNYN
// other: NYNNNNNNY
//
// returned: NNNNNNNNYYNYN
pub fn intersect_row_selections(
left: Vec<RowSelector>,
right: Vec<RowSelector>,
) -> Vec<RowSelector> {
let mut res = Vec::with_capacity(left.len());
let mut l_iter = left.into_iter().peekable();
let mut r_iter = right.into_iter().peekable();

while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) {
if a.row_count == 0 {
l_iter.next().unwrap();
continue;
}
if b.row_count == 0 {
r_iter.next().unwrap();
continue;
}
match (a.skip, b.skip) {
// Keep both ranges
(false, false) => {
if a.row_count < b.row_count {
res.push(RowSelector::select(a.row_count));
b.row_count -= a.row_count;
l_iter.next().unwrap();
} else {
res.push(RowSelector::select(b.row_count));
a.row_count -= b.row_count;
r_iter.next().unwrap();
}
}
// skip at least one
_ => {
if a.row_count < b.row_count {
res.push(RowSelector::skip(a.row_count));
b.row_count -= a.row_count;
l_iter.next().unwrap();
} else {
res.push(RowSelector::skip(b.row_count));
a.row_count -= b.row_count;
r_iter.next().unwrap();
}
}
}
}

if l_iter.peek().is_some() {
res.extend(l_iter);
}
if r_iter.peek().is_some() {
res.extend(r_iter);
}
res
}

fn add_selector(skip: bool, sum_row: usize, combined_result: &mut Vec<RowSelector>) {
let selector = if skip {
RowSelector::skip(sum_row)
Expand All @@ -421,6 +361,7 @@ fn add_selector(skip: bool, sum_row: usize, combined_result: &mut Vec<RowSelecto
#[cfg(test)]
mod tests {
use super::*;
use crate::arrow::arrow_reader::intersect_row_selections;
use crate::format::PageLocation;
use rand::{thread_rng, Rng};

Expand Down

0 comments on commit 7627627

Please sign in to comment.