Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine multiple selections into the same batch size in skip_records #2359

Merged
merged 2 commits into from Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
90 changes: 50 additions & 40 deletions parquet/src/arrow/arrow_reader.rs
Expand Up @@ -287,43 +287,59 @@ impl Iterator for ParquetRecordBatchReader {
type Item = ArrowResult<RecordBatch>;

fn next(&mut self) -> Option<Self::Item> {
let to_read = match self.selection.as_mut() {
Some(selection) => loop {
let front = selection.pop_front()?;
if front.skip {
let skipped = match self.array_reader.skip_records(front.row_count) {
Ok(skipped) => skipped,
Err(e) => return Some(Err(e.into())),
};

if skipped != front.row_count {
return Some(Err(general_err!(
"failed to skip rows, expected {}, got {}",
front.row_count,
skipped
)
.into()));
let mut read_records = 0;
match self.selection.as_mut() {
Some(selection) => {
while read_records < self.batch_size && !selection.is_empty() {
let front = selection.pop_front()?;
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
if front.skip {
let skipped =
match self.array_reader.skip_records(front.row_count) {
Ok(skipped) => skipped,
Err(e) => return Some(Err(e.into())),
};

if skipped != front.row_count {
return Some(Err(general_err!(
"failed to skip rows, expected {}, got {}",
front.row_count,
skipped
)
.into()));
}
continue;
}
continue;
}

// try to read record
let to_read = match front.row_count.checked_sub(self.batch_size) {
Some(remaining) if remaining != 0 => {
// if page row count less than batch_size we must set batch size to page row count.
// add check avoid dead loop
selection.push_front(RowSelection::select(remaining));
self.batch_size
// try to read record
let need_read = self.batch_size - read_records;
let to_read = match front.row_count.checked_sub(need_read) {
Some(remaining) if remaining != 0 => {
// if page row count less than batch_size we must set batch size to page row count.
// add check avoid dead loop
selection.push_front(RowSelection::select(remaining));
need_read
}
_ => front.row_count,
};
match self.array_reader.read_records(to_read) {
Ok(rec) if rec == 0 =>
// no more data in file, the last batch in this reader
{
break
}
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
Ok(rec) => read_records += rec,
Err(error) => return Some(Err(error.into())),
}
_ => front.row_count,
};

break to_read;
},
None => self.batch_size,
}
}
None => {
if let Err(error) = self.array_reader.read_records(self.batch_size) {
return Some(Err(error.into()));
}
}
};

match self.array_reader.next_batch(to_read) {
match self.array_reader.consume_batch() {
Err(error) => Some(Err(error.into())),
Ok(array) => {
let struct_array =
Expand Down Expand Up @@ -1212,15 +1228,9 @@ mod tests {
loop {
let maybe_batch = record_reader.next();
if total_read < expected_data.len() {
let mut end =
min(total_read + opts.record_batch_size, expected_data.len());
let end = min(total_read + opts.record_batch_size, expected_data.len());
let batch = maybe_batch.unwrap().unwrap();
//TODO remove this after implement https://github.com/apache/arrow-rs/issues/2197
if opts.row_selections.is_none() {
assert_eq!(end - total_read, batch.num_rows());
} else {
end = end.min(total_read + batch.num_rows())
}
assert_eq!(end - total_read, batch.num_rows());

let mut data = vec![];
data.extend_from_slice(&expected_data[total_read..end]);
Expand Down
6 changes: 0 additions & 6 deletions parquet/src/arrow/record_reader/mod.rs
Expand Up @@ -198,12 +198,6 @@ where
self.num_records += buffered_records;
self.num_values += buffered_values;

self.consume_def_levels();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tustvold FYI,
Sometimes like batch_size is = 20, when read 15 records without consume.
Then skip 50 records, finally read 5 records for one batch.
If we call these consume api during skip, we will lose the 15 records.

Do you think remove these is reasonable 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me

self.consume_rep_levels();
self.consume_record_data();
self.consume_bitmap();
self.reset();

let remaining = num_records - buffered_records;

if remaining == 0 {
Expand Down