Skip to content

Commit

Permalink
Combine multiple selections into the same batch size in skip_records (#…
Browse files Browse the repository at this point in the history
…2359)

* Combine multiple selections into the same batch size in skip_records

* Apply suggestions from code review

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
  • Loading branch information
Ted-Jiang and tustvold committed Aug 8, 2022
1 parent f3baeaa commit ce2bd1e
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 46 deletions.
86 changes: 46 additions & 40 deletions parquet/src/arrow/arrow_reader.rs
Expand Up @@ -287,43 +287,55 @@ impl Iterator for ParquetRecordBatchReader {
type Item = ArrowResult<RecordBatch>;

fn next(&mut self) -> Option<Self::Item> {
let to_read = match self.selection.as_mut() {
Some(selection) => loop {
let front = selection.pop_front()?;
if front.skip {
let skipped = match self.array_reader.skip_records(front.row_count) {
Ok(skipped) => skipped,
Err(e) => return Some(Err(e.into())),
};

if skipped != front.row_count {
return Some(Err(general_err!(
"failed to skip rows, expected {}, got {}",
front.row_count,
skipped
)
.into()));
let mut read_records = 0;
match self.selection.as_mut() {
Some(selection) => {
while read_records < self.batch_size && !selection.is_empty() {
let front = selection.pop_front().unwrap();
if front.skip {
let skipped =
match self.array_reader.skip_records(front.row_count) {
Ok(skipped) => skipped,
Err(e) => return Some(Err(e.into())),
};

if skipped != front.row_count {
return Some(Err(general_err!(
"failed to skip rows, expected {}, got {}",
front.row_count,
skipped
)
.into()));
}
continue;
}
continue;
}

// try to read record
let to_read = match front.row_count.checked_sub(self.batch_size) {
Some(remaining) if remaining != 0 => {
// if page row count less than batch_size we must set batch size to page row count.
// add check avoid dead loop
selection.push_front(RowSelection::select(remaining));
self.batch_size
// try to read record
let need_read = self.batch_size - read_records;
let to_read = match front.row_count.checked_sub(need_read) {
Some(remaining) if remaining != 0 => {
// if page row count less than batch_size we must set batch size to page row count.
// add check avoid dead loop
selection.push_front(RowSelection::select(remaining));
need_read
}
_ => front.row_count,
};
match self.array_reader.read_records(to_read) {
Ok(0) => break,
Ok(rec) => read_records += rec,
Err(error) => return Some(Err(error.into())),
}
_ => front.row_count,
};

break to_read;
},
None => self.batch_size,
}
}
None => {
if let Err(error) = self.array_reader.read_records(self.batch_size) {
return Some(Err(error.into()));
}
}
};

match self.array_reader.next_batch(to_read) {
match self.array_reader.consume_batch() {
Err(error) => Some(Err(error.into())),
Ok(array) => {
let struct_array =
Expand Down Expand Up @@ -1212,15 +1224,9 @@ mod tests {
loop {
let maybe_batch = record_reader.next();
if total_read < expected_data.len() {
let mut end =
min(total_read + opts.record_batch_size, expected_data.len());
let end = min(total_read + opts.record_batch_size, expected_data.len());
let batch = maybe_batch.unwrap().unwrap();
//TODO remove this after implement https://github.com/apache/arrow-rs/issues/2197
if opts.row_selections.is_none() {
assert_eq!(end - total_read, batch.num_rows());
} else {
end = end.min(total_read + batch.num_rows())
}
assert_eq!(end - total_read, batch.num_rows());

let mut data = vec![];
data.extend_from_slice(&expected_data[total_read..end]);
Expand Down
6 changes: 0 additions & 6 deletions parquet/src/arrow/record_reader/mod.rs
Expand Up @@ -198,12 +198,6 @@ where
self.num_records += buffered_records;
self.num_values += buffered_values;

self.consume_def_levels();
self.consume_rep_levels();
self.consume_record_data();
self.consume_bitmap();
self.reset();

let remaining = num_records - buffered_records;

if remaining == 0 {
Expand Down

0 comments on commit ce2bd1e

Please sign in to comment.