Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix record delimiting on row group boundaries (#2025) #2027

Merged
merged 3 commits into from Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/list_array.rs
Expand Up @@ -124,7 +124,7 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {

// The output offsets for the computed ListArray
let mut list_offsets: Vec<OffsetSize> =
Vec::with_capacity(next_batch_array.len());
Vec::with_capacity(next_batch_array.len() + 1);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive by fix


// The validity mask of the computed ListArray if nullable
let mut validity = self
Expand Down
53 changes: 53 additions & 0 deletions parquet/src/arrow/arrow_reader.rs
Expand Up @@ -1528,4 +1528,57 @@ mod tests {

assert_eq!(total_rows, expected_rows);
}

#[test]
fn test_row_group_exact_multiple() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I remove the code in this PR this test fails like the following (I was expecting the reader to miss the last record)


---- arrow::arrow_reader::tests::test_row_group_exact_multiple stdout ----
thread 'arrow::arrow_reader::tests::test_row_group_exact_multiple' panicked at 'called `Result::unwrap()` on an `Err` value: ParquetError("Parquet error: Not all children array length are the same!")', parquet/src/arrow/arrow_reader.rs:1581:53
stack backtrace:
   0: rust_begin_unwind
             at /rustc/a8314ef7d0ec7b75c336af2c9857bfaf43002bfc/library/std/src/panicking.rs:584:5
   1: core::panicking::panic_fmt
             at /rustc/a8314ef7d0ec7b75c336af2c9857bfaf43002bfc/library/core/src/panicking.rs:142:14
   2: core::result::unwrap_failed
             at /rustc/a8314ef7d0ec7b75c336af2c9857bfaf43002bfc/library/core/src/result.rs:1785:5
   3: core::result::Result<T,E>::unwrap
             at /rustc/a8314ef7d0ec7b75c336af2c9857bfaf43002bfc/library/core/src/result.rs:1078:23
   4: parquet::arrow::arrow_reader::tests::test_row_group_exact_multiple
             at ./src/arrow/arrow_reader.rs:1581:23
   5: parquet::arrow::arrow_reader::tests::test_row_group_exact_multiple::{{closure}}
             at ./src/arrow/arrow_reader.rs:1533:5
   6: core::ops::function::FnOnce::call_once
             at /rustc/a8314ef7d0ec7b75c336af2c9857bfaf43002bfc/library/core/src/ops/function.rs:248:5
   7: core::ops::function::FnOnce::call_once
             at /rustc/a8314ef7d0ec7b75c336af2c9857bfaf43002bfc/library/core/src/ops/function.rs:248:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only misses the last record for the repeated array, and so you end up with this error

let schema = Arc::new(Schema::new(vec![
Field::new("int", ArrowDataType::Int32, false),
Field::new(
"list",
ArrowDataType::List(Box::new(Field::new(
"item",
ArrowDataType::Int32,
true,
))),
true,
),
]));

let mut buf = Vec::with_capacity(1024);

let mut writer = ArrowWriter::try_new(
&mut buf,
schema.clone(),
Some(
WriterProperties::builder()
.set_max_row_group_size(8)
.build(),
),
)
.unwrap();
for _ in 0..2 {
let mut int_builder = Int32Builder::new(10);
let mut list_builder = ListBuilder::new(Int32Builder::new(10));
for i in 0..10 {
int_builder.append_value(i).unwrap();
list_builder.append(true).unwrap();
}
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(int_builder.finish()),
Arc::new(list_builder.finish()),
],
)
.unwrap();
writer.write(&batch).unwrap();
}
writer.close().unwrap();

let mut file_reader = ParquetFileArrowReader::try_new(Bytes::from(buf)).unwrap();
let mut record_reader = file_reader.get_record_reader(8).unwrap();
assert_eq!(8, record_reader.next().unwrap().unwrap().num_rows());
assert_eq!(8, record_reader.next().unwrap().unwrap().num_rows());
assert_eq!(4, record_reader.next().unwrap().unwrap().num_rows());
}
}
106 changes: 79 additions & 27 deletions parquet/src/arrow/record_reader/mod.rs
Expand Up @@ -139,32 +139,19 @@ where

let mut records_read = 0;

// Used to mark whether we have reached the end of current
// column chunk
let mut end_of_column = false;

loop {
// Try to find some records from buffers that has been read into memory
// but not counted as seen records.
let end_of_column = !self.column_reader.as_mut().unwrap().has_next()?;

let (record_count, value_count) =
self.count_records(num_records - records_read);
self.count_records(num_records - records_read, end_of_column);

self.num_records += record_count;
self.num_values += value_count;
records_read += record_count;

if records_read == num_records {
break;
}

if end_of_column {
// Since page reader contains complete records, if we reached end of a
// page reader, we should reach the end of a record
if self.rep_levels.is_some() {
self.num_records += 1;
self.num_values = self.values_written;
records_read += 1;
}
if records_read == num_records || end_of_column {
break;
}

Expand Down Expand Up @@ -194,10 +181,7 @@ where
};

// Try to more value from parquet pages
let values_read = self.read_one_batch(batch_size)?;
if values_read < batch_size {
Copy link
Contributor Author

@tustvold tustvold Jul 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the cause of the bug, the end of the column chunk was being detected based on if read_on_batch returned less than the batch size. If the batch_size happens to exactly match the remaining records this would fail, and the reader would miss the last record in the chunk

end_of_column = true;
}
self.read_one_batch(batch_size)?;
}

Ok(records_read)
Expand All @@ -210,7 +194,14 @@ where
/// Number of records skipped
pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
// First need to clear the buffer
let (buffered_records, buffered_values) = self.count_records(num_records);
let end_of_column = match self.column_reader.as_mut() {
Some(reader) => !reader.has_next()?,
None => return Ok(0),
};

let (buffered_records, buffered_values) =
self.count_records(num_records, end_of_column);

self.num_records += buffered_records;
self.num_values += buffered_values;

Expand All @@ -226,10 +217,11 @@ where
return Ok(buffered_records);
}

let skipped = match self.column_reader.as_mut() {
Some(column_reader) => column_reader.skip_records(remaining)?,
None => 0,
};
let skipped = self
.column_reader
.as_mut()
.unwrap()
.skip_records(remaining)?;

Ok(skipped + buffered_records)
}
Expand Down Expand Up @@ -334,8 +326,15 @@ where
/// Inspects the buffered repetition levels in the range `self.num_values..self.values_written`
/// and returns the number of "complete" records along with the corresponding number of values
///
/// If `end_of_column` is true it indicates that there are no further values for this
/// column chunk beyond what is currently in the buffers
///
/// A "complete" record is one where the buffer contains a subsequent repetition level of 0
fn count_records(&self, records_to_read: usize) -> (usize, usize) {
fn count_records(
&self,
records_to_read: usize,
end_of_column: bool,
) -> (usize, usize) {
match self.rep_levels.as_ref() {
Some(buf) => {
let buf = buf.as_slice();
Expand All @@ -359,6 +358,15 @@ where
}
}

// If reached end of column chunk => end of a record
if records_read != records_to_read
&& end_of_column
&& self.values_written != 0
{
records_read += 1;
end_of_last_record = self.values_written;
}

(records_read, end_of_last_record - self.num_values)
}
None => {
Expand Down Expand Up @@ -731,4 +739,48 @@ mod tests {
assert_eq!(5000, record_reader.num_values());
}
}

#[test]
fn test_row_group_boundary() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test passes on master, without the changes in this PR 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is somewhat expected, I'll see if I can make it fail

// Construct column schema
let message_type = "
message test_schema {
REPEATED Group test_struct {
REPEATED INT32 leaf;
}
}
";

let desc = parse_message_type(message_type)
.map(|t| SchemaDescriptor::new(Arc::new(t)))
.map(|s| s.column(0))
.unwrap();

let values = [1, 2, 3];
let def_levels = [1i16, 0i16, 1i16, 2i16, 2i16, 1i16, 2i16];
let rep_levels = [0i16, 0i16, 0i16, 1i16, 2i16, 0i16, 1i16];
let mut pb = DataPageBuilderImpl::new(desc.clone(), 7, true);
pb.add_rep_levels(2, &rep_levels);
pb.add_def_levels(2, &def_levels);
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
let page_reader = Box::new(InMemoryPageReader::new(vec![page.clone()]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(record_reader.read_records(4).unwrap(), 4);
assert_eq!(record_reader.num_records(), 4);
assert_eq!(record_reader.num_values(), 7);

let mut record_reader = RecordReader::<Int32Type>::new(desc);
let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(record_reader.read_records(3).unwrap(), 3);
assert_eq!(record_reader.num_records(), 3);
assert_eq!(record_reader.num_values(), 5);

assert_eq!(record_reader.read_records(3).unwrap(), 1);
assert_eq!(record_reader.num_records(), 4);
assert_eq!(record_reader.num_values(), 7);
}
}
2 changes: 1 addition & 1 deletion parquet/src/column/reader.rs
Expand Up @@ -474,7 +474,7 @@ where
}

#[inline]
fn has_next(&mut self) -> Result<bool> {
pub(crate) fn has_next(&mut self) -> Result<bool> {
if self.num_buffered_values == 0
|| self.num_buffered_values == self.num_decoded_values
{
Expand Down